Flink流处理API——Sink
程序员文章站
2022-06-17 09:36:27
...
原文链接:https://www.toutiao.com/i6859235904779715076/
本文主要从以下几个方面介绍Flink的流处理API——Sink
一、输出到Kafka
二、输出到Redis
三、输出到MySQL
数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。
这三部分在Flink中分别被称为Source、Transform和Sink
其中Source部分可以参考这篇:Flink流处理API——Source
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
Flink支持的Sink
版本:
scala:2.11.12
Kafka:0.8.2.2
Flink:1.7.2
Redis:3.2.9
MySQL:5.7.30
pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink链接Kafka0.8时会报Failed to instantiate SLF4J LoggerFactory Reported exception错误)
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
一、输出到Kafka
Flink和Kafka天生是一对,Sink到Kafka相当方便
package xxx
import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
object SinkDataToKafka {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// source
val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
// transform操作
val maped: DataStream[String] = value.map(line => {
val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble).toString // 转成String方便输出
})
// FlinkKafkaProducer08参数:brokerList,topicID,序列化机制
maped.addSink(new FlinkKafkaProducer08[String](
"slave1:9092,slave2:9092,slave3:9092", "out", new SimpleStringSchema()))
environment.execute()
}
}
二、输出到Redis
redis:在slave4,端口:6379,密码;123
package xxx
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object SinkDataToRedis {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// source
val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
// transform操作
val maped: DataStream[SensorReading] = value.map(line => {
val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
})
// redis的链接参数
val conf = new FlinkJedisPoolConfig.Builder().setHost("slave4").setPort(6379).setPassword("123").build()
maped.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper())) // 用hgetall sensor 命令查看值
environment.execute()
}
}
class MyRedisMapper extends RedisMapper[SensorReading]{
// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor")
}
// 保存到redis的key
override def getKeyFromData(t: SensorReading): String = {
t.id
}
// 保存到redis的value
override def getValueFromData(t: SensorReading): String = {
t.temperature.toString
}
}
三、输出到MySQL
表信息:slave3,端口:3306,用户名:root 密码:root
库名:sensor_db 表名:sensor
字段信息:
package xxx
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object SinkDataToMySQL {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// source
val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
// transform操作
val maped: DataStream[SensorReading] = value.map(line => {
val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
})
maped.addSink(new MyJDBCSink())
environment.execute()
}
}
class MyJDBCSink() extends RichSinkFunction[SensorReading]{
// 定义SQL连接、定义预编译器
var connect: Connection = _
var insertStrmt: PreparedStatement = _
var updateStrmt : PreparedStatement = _
// 初始化,创建链接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
connect = DriverManager.getConnection("jdbc:mysql://slave3:3306/sensor_db?useUnicode=true&characterEncoding=UTF-8", "root", "root")
insertStrmt = connect.prepareStatement("INSERT INTO sensors(id, temp) VALUES (?, ?)")
updateStrmt = connect.prepareStatement("UPDATE sensors set temp = ? where id = ?")
}
// 调用链接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStrmt.setDouble(1, value.temperature) // setDouble表示数据的类型,1表示第几个?,value.temperature表示赋值
updateStrmt.setString(2, value.id)
updateStrmt.execute()
// update没有查到数据,那么执行插入语句
if (updateStrmt.getUpdateCount == 0){
insertStrmt.setDouble(2, value.temperature)
insertStrmt.setString(1, value.id)
insertStrmt.execute()
}
}
// 关闭链接
override def close(): Unit = {
insertStrmt.close()
updateStrmt.close()
connect.close()
}
}
推荐阅读
-
统一批处理流处理——Flink批流一体实现原理
-
基于Flink流处理的动态实时亿级用户全端数据统计分析系统(支持所有的终端统计) flink
-
基于Flink流处理的动态实时亿级用户全端数据统计分析系统(支持所有的终端统计) flink
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作
-
Flink零基础实战教程:股票价格数据流实时处理
-
4 Flink1.10.1对wordcount进行批处理和流处理
-
统一批处理流处理——Flink批流一体实现原理
-
Flink 流处理 API_Sink
-
Flink流处理API——Sink