Flink流处理-DataStream常用Source及Sink
程序员文章站
2022-06-17 09:36:03
...
环境
flink-1.9.0
一、需要的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
</dependency>
二、初始化执行环境读取数据文件
数据文件word
how are you
world and that
hello world
jack and
app storm storm what
spark spark
初始化流处理执行环境
/**
* 初始化流处理执行环境
*/
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
三、常用Source
1.socketTextStream
从socket套接字中获取数据
/**
* 从socket套接字中获取数据
*/
public static void socketTextStream() throws Exception {
// 从本地socket套接字中读取数据
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
// 打印输入的内容
dataStream.print();
// 执行任务
env.execute();
}
打开CMD,运行netcat,输入nc -l -p 9999后运行该程序,等待连接完成后输入一些内容,可以看到在控制台中输出了socket中输入的内容
2.readTextFile(path)
从文本文件中读取数据作为数据源
/**
* 从文本文件中读取数据作为数据源
* 注意:文件可以是本地文件,也可以是hdfs中的文件,只需要指定对应的路径即可
*/
public static void readTextFile() throws Exception {
// 从本地文本文件中读取数据
DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 从hdfs文件系统中读取数据
//DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");
// 将文本中每行单词切分成单个单词并收集
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(word);
}
}
}).print();
// 执行任务
env.execute();
}
运行结果
3.generateSequence(from, to)
将序列作为数据源
/**
* 从生成序列中读取数据作为数据源
*/
public static void generateSequence() throws Exception {
// 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
env.setParallelism(1);
// 生成1-10的序列并输出
env.generateSequence(1, 10).print();
// 执行任务
env.execute();
}
运行结果
4.fromCollection(Seq)
将集合中的数据当作数据源
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromCollection() throws Exception {
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();
// 执行任务
env.execute();
}
运行结果
5.fromElements(elements: _*)
将一堆元素作为数据源
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromElements() throws Exception {
env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
// 执行任务
env.execute();
}
运行结果
6.addSource
自定义数据源
MySQL表中的数据
1 hello 1
2 hi 3
3 flink 1
4 scala 1
5 spark 1
6 hadoop 1
7 hive 1
Mysql自定义数据源类
package cn.myclass.stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* 自定义mysql数据源
* 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
* 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
* 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
* 的open以及close方法更加合理的利用资源读取数据。
* @author Yang
*/
public class MysqlDataSource extends RichSourceFunction<String> {
/**
* 预处理对象
*/
private PreparedStatement preparedStatement = null;
/**
* 连接对象
*/
private Connection connection = null;
/**
* 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
* @param parameters 参数信息
*/
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
//创建连接
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/flink",
"root",
"root");
// 从word表中读取所有单词
String sql = "select word from word";
// 获得预处理对象
preparedStatement = connection.prepareStatement(sql);
}
/**
* 读取数据时执行此方法,从查询结果中依次获得单词
* @param sourceContext 数据源上下文对象
*/
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
// 执行查询获得结果
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
// 将结果添加到收集器中
sourceContext.collect(resultSet.getString("word"));
}
}
/**
* 取消任务时执行
*/
@Override
public void cancel() {
}
/**
* 关闭时的方法,关闭MySQL连接,避免资源占用
*/
@Override
public void close() throws Exception {
if (preparedStatement != null){
preparedStatement.close();
}
if (connection != null){
connection.close();
}
}
}
调用方法
/**
* 从自定义数据源中读取数据
*/
public static void addSource() throws Exception {
// 添加自定义数据源并打印读取的数据
env.addSource(new MysqlDataSource()).print();
// 执行任务
env.execute();
}
运行结果
四、常用Sink
1.print
测试时常用的方法,将结果直接输出到标准输出设备(控制台)。略
2.writeAsText
保存为文本文件
/**
* 将结果写入到文本文件中
*/
public static void writeAsText() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 设置并行度为1,将结果写入到一个文件中
env.setParallelism(1);
// 将结果写入到hdfs中
//dataStream.writeAsText("hdfs://master:9000/words.txt");
// 将结果写到本地文本文件中
dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
// 执行任务
env.execute();
}
运行结果
3.writeAsCsv
保存为csv文件
/**
* 将结果写入到csv文件中
* 注意:将结果写入到csv只支持元组类型的数据,所以在这里将结果转化成了元组并无实际意义
*/
public static void writeAsCsv() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 设置并行度
env.setParallelism(1);
// 将单词转化成元组
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(new Tuple2<>(word, 1));
}
}
}).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
// 执行任务
env.execute();
}
运行结果
4.writeToSocket
输出到套接字
/**
* 将结果写入到socket套接字中
*/
public static void writeToSocket() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 将结果写入到socket套接字中,以简单字符串类型发送
dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
// 执行任务
env.execute();
}
打开CMD,运行netcat,输入nc -lp 9999等待接收运行结果后执行该方法,可以看到nc中输出的结果
5.addSink
自定义Mysql数据沉槽类
package cn.myclass.stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* 自定义Mysql数据沉槽
* @author Yang
*/
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
/**
* 预处理对象
*/
private PreparedStatement preparedStatement = null;
/**
* 连接对象
*/
private Connection connection = null;
/**
* 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
* @param parameters 参数信息
*/
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
// 创建连接
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/flink",
"root",
"root");
// 从word表中读取所有单词
String sql = "insert into word(word,count) values(?,?)";
// 预编译语句并获得预处理对象
preparedStatement = connection.prepareStatement(sql);
}
/**
* 每条结果执行的方法
* @param tuple2 元组数据
* @param context 上下文
*/
@Override
public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
// 设置sql语句中的第一个和第二个值
preparedStatement.setString(1, tuple2.f0);
preparedStatement.setInt(2, tuple2.f1);
// 执行插入
preparedStatement.executeUpdate();
}
/**
* 关闭时的方法,关闭MySQL连接,避免资源占用
*/
@Override
public void close() throws Exception {
if (preparedStatement != null){
preparedStatement.close();
}
if (connection != null){
connection.close();
}
}
}
调用方法
/**
* 将结果写入自定义数据沉槽
*/
public static void addSink() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 将单词形成元组并设置次数为1
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(new Tuple2<>(word, 1));
}
}
// 写入MySql中
}).addSink(new MysqlDataSink());
// 执行任务
env.execute();
}
运行结果
五、完整代码
自定义Mysql数据源
package cn.myclass.stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* 自定义mysql数据源
* 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
* 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
* 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
* 的open以及close方法更加合理的利用资源读取数据。
* @author Yang
*/
public class MysqlDataSource extends RichSourceFunction<String> {
/**
* 预处理对象
*/
private PreparedStatement preparedStatement = null;
/**
* 连接对象
*/
private Connection connection = null;
/**
* 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
* @param parameters 参数信息
*/
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
//创建连接
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/flink",
"root",
"root");
// 从word表中读取所有单词
String sql = "select word from word";
// 获得预处理对象
preparedStatement = connection.prepareStatement(sql);
}
/**
* 读取数据时执行此方法,从查询结果中依次获得单词
* @param sourceContext 数据源上下文对象
*/
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
// 执行查询获得结果
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
// 将结果添加到收集器中
sourceContext.collect(resultSet.getString("word"));
}
}
/**
* 取消任务时执行
*/
@Override
public void cancel() {
}
/**
* 关闭时的方法,关闭MySQL连接,避免资源占用
*/
@Override
public void close() throws Exception {
if (preparedStatement != null){
preparedStatement.close();
}
if (connection != null){
connection.close();
}
}
}
自定义Mysql数据沉槽
package cn.myclass.stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* 自定义Mysql数据沉槽
* @author Yang
*/
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
/**
* 预处理对象
*/
private PreparedStatement preparedStatement = null;
/**
* 连接对象
*/
private Connection connection = null;
/**
* 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
* @param parameters 参数信息
*/
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
// 创建连接
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/flink",
"root",
"root");
// 从word表中读取所有单词
String sql = "insert into word(word,count) values(?,?)";
// 预编译语句并获得预处理对象
preparedStatement = connection.prepareStatement(sql);
}
/**
* 每条结果执行的方法
* @param tuple2 元组数据
* @param context 上下文
*/
@Override
public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
// 设置sql语句中的第一个和第二个值
preparedStatement.setString(1, tuple2.f0);
preparedStatement.setInt(2, tuple2.f1);
// 执行插入
preparedStatement.executeUpdate();
}
/**
* 关闭时的方法,关闭MySQL连接,避免资源占用
*/
@Override
public void close() throws Exception {
if (preparedStatement != null){
preparedStatement.close();
}
if (connection != null){
connection.close();
}
}
}
主类
package cn.myclass.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
/**
* Flink流处理常用数据源及数据沉槽
* @author Yang
*/
public class DataStreamSourceAndSink {
/**
* 初始化流处理执行环境
*/
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从socket套接字中获取数据作为数据源
*/
public static void socketTextStream() throws Exception {
// 从本地socket套接字中读取数据
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
// 打印输入的内容
dataStream.print();
// 执行任务
env.execute();
}
/**
* 从文本文件中读取数据作为数据源
* 文本文件可以是本地文件,也可以是hdfs中的文件,只需要指定路径即可
*/
public static void readTextFile() throws Exception {
// 从本地文本文件中读取数据
DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 从hdfs文件系统中读取数据
//DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");
// 将文本中每行单词切分成单个单词并收集
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(word);
}
}
}).print();
// 执行任务
env.execute();
}
/**
* 从生成序列中读取数据作为数据源
*/
public static void generateSequence() throws Exception {
// 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
env.setParallelism(1);
// 生成1-10的序列并输出
env.generateSequence(1, 10).print();
// 执行任务
env.execute();
}
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromCollection() throws Exception {
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();
// 执行任务
env.execute();
}
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromElements() throws Exception {
env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
// 执行任务
env.execute();
}
/**
* 从自定义数据源中读取数据
*/
public static void addSource() throws Exception {
// 添加自定义数据源并打印读取的数据
env.addSource(new MysqlDataSource()).print();
// 执行任务
env.execute();
}
/**
* 将结果写入到文本文件中
*/
public static void writeAsText() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 设置并行度为1,将结果写入到一个文件中
env.setParallelism(1);
// 将结果写入到hdfs中
//dataStream.writeAsText("hdfs://master:9000/words.txt");
// 将结果写到本地文本文件中
dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
// 执行任务
env.execute();
}
/**
* 将结果写入到csv文件中
* 注意:将结果写入到csv只支持元组类型的数据
*/
public static void writeAsCsv() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 设置并行度
env.setParallelism(1);
// 将单词转化成元组
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(new Tuple2<>(word, 1));
}
}
}).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
// 执行任务
env.execute();
}
/**
* 将结果写入到socket套接字中
*/
public static void writeToSocket() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 将结果写入到socket套接字中,以简单字符串类型发送
dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
// 执行任务
env.execute();
}
/**
* 将结果写入自定义数据沉槽
*/
public static void addSink() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 将单词形成元组并初始化次数为1
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split("\t");
for(String word: words){
collector.collect(new Tuple2<>(word, 1));
}
}
// 写入MySql中
}).addSink(new MysqlDataSink());
// 执行任务
env.execute();
}
public static void main(String[] args) throws Exception {
socketTextStream();
readTextFile();
generateSequence();
fromCollection();
fromElements();
addSource();
writeAsText();
writeAsCsv();
writeToSocket();
addSink();
}
}
如有错误,望指正!
上一篇: Flink批处理-DataSet常用Source及Sink
下一篇: 时间、财富、勤劳、永恒、轮回