Flink流处理API之Sink
程序员文章站
2022-06-17 09:36:15
...
Flink流处理API
代码主要分为四个模块:environment, source,transform,sink
Sink
flink中对外的输出都要利用sink的完成
Kafka
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")
val dataStream2 = dataStream.filter(x => !x.isEmpty)
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble).toString
})
dataStream2.addSink(new FlinkKafkaProducer[String]("192.168.0.80:9092", "qml", new SimpleStringSchema()))
env.execute("kafka sink test")
}
}
依赖版本对应:
Redis
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")
val dataStream2 = dataStream.filter(x => !x.isEmpty)
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
val conf = new FlinkJedisPoolConfig.Builder()
.setDatabase(0)
.setHost("localhost")
.setPort(6379)
.build()
dataStream2.addSink(new RedisSink(conf, new MyRedisMapper))
env.execute("redis sink test")
}
}
class MyRedisMapper extends RedisMapper[SensorReading] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensorReading")
}
override def getKeyFromData(t: SensorReading): String = t.id
override def getValueFromData(t: SensorReading): String = t.temperature.toString
}
Elasticsearch
object ESSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")
val dataStream2 = dataStream.filter(x => !x.isEmpty)
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSink = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val json = new util.HashMap[String, String]()
json.put("sensor", t.id)
json.put("temperature", t.temperature.toString)
json.put("time", t.timestamp.toString)
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("data")
.source(json)
requestIndexer.add(indexRequest)
println("write")
}
}
)
dataStream2.addSink(esSink.build())
env.execute("es sink test")
}
}
JDBC自定义Sink
object JDBCSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")
val dataStream2 = dataStream.filter(x => !x.isEmpty)
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
dataStream2.addSink(new MyJdbcSink)
env.execute("jdbc sink test")
}
}
class MyJdbcSink extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO sensor (id, temp) VALUES (?,?)")
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
if (insertStmt.executeUpdate() == 0) {
print("插入失败")
}
}
override def close(): Unit = {
try {
if (insertStmt != null) {
insertStmt.close()
}
} catch {
case e: SQLException => e.printStackTrace()
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case e: SQLException => e.printStackTrace()
}
}
}
}
推荐阅读
-
统一批处理流处理——Flink批流一体实现原理
-
荐 Java语言基础之JDK1.8新特性(Lambda表达式、函数式接口、Stream流、新的日期API)
-
图像预处理之opencv卷积/滤波api
-
基于Flink流处理的动态实时亿级用户全端数据统计分析系统(支持所有的终端统计) flink
-
基于Flink流处理的动态实时亿级用户全端数据统计分析系统(支持所有的终端统计) flink
-
Flink DataStream API之Operators
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink 自定义Sink 之 写入HDFS
-
Flink 批处理之DataSet
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作