欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink流处理API之Sink

程序员文章站 2022-06-17 09:36:15
...

Flink流处理API

代码主要分为四个模块:environment, source,transform,sink

Sink

flink中对外的输出都要利用sink的完成

Kafka

object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")

    val dataStream2 = dataStream.filter(x => !x.isEmpty)
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble).toString
      })

    dataStream2.addSink(new FlinkKafkaProducer[String]("192.168.0.80:9092", "qml", new SimpleStringSchema()))

    env.execute("kafka sink test")
  }
}
依赖版本对应:

Flink流处理API之Sink

Redis

object RedisSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")

    val dataStream2 = dataStream.filter(x => !x.isEmpty)
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      })

    val conf = new FlinkJedisPoolConfig.Builder()
      .setDatabase(0)
      .setHost("localhost")
      .setPort(6379)
      .build()

    dataStream2.addSink(new RedisSink(conf, new MyRedisMapper))

    env.execute("redis sink test")
  }
}

class MyRedisMapper extends RedisMapper[SensorReading] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "sensorReading")
  }

  override def getKeyFromData(t: SensorReading): String = t.id

  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}

Elasticsearch

​​​​​​​object ESSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")

    val dataStream2 = dataStream.filter(x => !x.isEmpty)
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      })
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    val esSink = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val json = new util.HashMap[String, String]()
          json.put("sensor", t.id)
          json.put("temperature", t.temperature.toString)
          json.put("time", t.timestamp.toString)
          
          val indexRequest = Requests.indexRequest()
            .index("sensor")
            .`type`("data")
            .source(json)
          
          requestIndexer.add(indexRequest)
          
          println("write")
        }
      }
    )
    dataStream2.addSink(esSink.build())
    
    env.execute("es sink test")
  }
}

JDBC自定义Sink

​​​​​​​object JDBCSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt")

    val dataStream2 = dataStream.filter(x => !x.isEmpty)
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      })

    dataStream2.addSink(new MyJdbcSink)

    env.execute("jdbc sink test")
  }
}

class MyJdbcSink extends RichSinkFunction[SensorReading] {

  var conn: Connection = _
  var insertStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
    insertStmt = conn.prepareStatement("INSERT INTO sensor (id, temp) VALUES (?,?)")

  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    insertStmt.setString(1, value.id)
    insertStmt.setDouble(2, value.temperature)
    if (insertStmt.executeUpdate() == 0) {
      print("插入失败")
    }

  }

  override def close(): Unit = {
    try {
      if (insertStmt != null) {
        insertStmt.close()
      }
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      try {
        if (conn != null) {
          conn.close()
        }
      } catch {
        case e: SQLException => e.printStackTrace()
      }
    }
  }
}

 

相关标签: flink 大数据