欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink流处理API——Sink

程序员文章站 2022-06-17 09:36:27
...

原文链接:https://www.toutiao.com/i6859235904779715076/

本文主要从以下几个方面介绍Flink的流处理API——Sink

一、输出到Kafka

二、输出到Redis

三、输出到MySQL

数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。

这三部分在Flink中分别被称为Source、Transform和Sink

其中Source部分可以参考这篇:Flink流处理API——Source

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))


官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

Flink流处理API——Sink

Flink支持的Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

Redis:3.2.9

MySQL:5.7.30

pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink链接Kafka0.8时会报Failed to instantiate SLF4J LoggerFactory Reported exception错误)

<dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>


    </dependencies>

一、输出到Kafka

Flink和Kafka天生是一对,Sink到Kafka相当方便

package xxx

import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._

object SinkDataToKafka {
  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    
    // source    
    val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")

    // transform操作    
    val maped: DataStream[String] = value.map(line => {
      val fildes: Array[String] = line.split(",")  // 这里的split是scala的split方法      
      SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble).toString   // 转成String方便输出    
    })

    // FlinkKafkaProducer08参数:brokerList,topicID,序列化机制    
  maped.addSink(new FlinkKafkaProducer08[String](
    "slave1:9092,slave2:9092,slave3:9092", "out", new SimpleStringSchema()))

    environment.execute()
  }
}

二、输出到Redis

redis:在slave4,端口:6379,密码;123

package xxx

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object SinkDataToRedis {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // source
    val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")

    // transform操作
    val maped: DataStream[SensorReading] = value.map(line => {
      val fildes: Array[String] = line.split(",")  // 这里的split是scala的split方法
      SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
    })

    // redis的链接参数
    val conf = new FlinkJedisPoolConfig.Builder().setHost("slave4").setPort(6379).setPassword("123").build()

    maped.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper())) // 用hgetall sensor 命令查看值

    environment.execute()
  }
}

class MyRedisMapper extends RedisMapper[SensorReading]{
  // 定义保存数据到redis的命令
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "sensor")
  }

  // 保存到redis的key
  override def getKeyFromData(t: SensorReading): String = {
    t.id
  }

  // 保存到redis的value
  override def getValueFromData(t: SensorReading): String = {
    t.temperature.toString
  }
}

三、输出到MySQL

表信息:slave3,端口:3306,用户名:root 密码:root

库名:sensor_db 表名:sensor

字段信息:

Flink流处理API——Sink

 

package xxx

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object SinkDataToMySQL {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // source
    val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")

    // transform操作
    val maped: DataStream[SensorReading] = value.map(line => {
      val fildes: Array[String] = line.split(",")  // 这里的split是scala的split方法
      SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
    })

    maped.addSink(new MyJDBCSink())

    environment.execute()
  }

}

class MyJDBCSink() extends RichSinkFunction[SensorReading]{
  // 定义SQL连接、定义预编译器
  var connect: Connection = _
  var insertStrmt: PreparedStatement = _
  var updateStrmt : PreparedStatement = _

  // 初始化,创建链接和预编译语句
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    connect = DriverManager.getConnection("jdbc:mysql://slave3:3306/sensor_db?useUnicode=true&characterEncoding=UTF-8", "root", "root")
    insertStrmt = connect.prepareStatement("INSERT  INTO sensors(id, temp) VALUES (?, ?)")
    updateStrmt = connect.prepareStatement("UPDATE sensors set temp = ? where id = ?")
  }

  // 调用链接,执行sql
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 执行更新语句
    updateStrmt.setDouble(1, value.temperature)  // setDouble表示数据的类型,1表示第几个?,value.temperature表示赋值
    updateStrmt.setString(2, value.id)
    updateStrmt.execute()

    // update没有查到数据,那么执行插入语句
    if (updateStrmt.getUpdateCount == 0){
      insertStrmt.setDouble(2, value.temperature)
      insertStrmt.setString(1, value.id)
      insertStrmt.execute()
    }
  }
  
// 关闭链接
  override def close(): Unit = {
    insertStrmt.close()
    updateStrmt.close()
    connect.close()
  }

}
相关标签: Flink Redis MySQ