欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink之自定义Source

程序员文章站 2022-06-16 12:14:40
...

主函数

有4中读取数据方式

1.从集合中读取数据env.fromCollection(List(…))
2.从文件中读取数据env.readTextFile(path)
3.从kafka中读取数据 env.addSource(new FlinkKafkaConsumer[String](“sensor”, new SimpleStringSchema(), properties))
4. 第四种就是自定义source
val stream4 = env.addSource(new SensorSource())

package com.flink.sourceAndSink

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import scala.util.Random

/**
 * Created by Shi shuai RollerQing on 2019/12/19 9:09
 */

// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double)
object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    import org.apache.flink.streaming.api.scala._
    //1.从集合中读取数据
    val stream1 = env.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    ))

    //2.从文件中读取数据
    val stream2 = env.readTextFile("C:\\Users\\HP\\IdeaProjects\\sparkCore\\flink\\src\\main\\resources\\sensor.txt")


    //3.从kafka中读取数据
    //创建kafka相关配置
    val  properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop01:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")


    val stream3 = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
    //val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

    /***
     * 4. 自定义数据源
     *
     */

    val stream4 = env.addSource(new SensorSource())

    //sink输出
    stream4.print("stream4")

    env.execute("source api test")
  }
}

自定义Source

class SensorSource() extends SourceFunction[SensorReading]{
  //定义一个flag:表示数据源是否还在正常运行
  var running: Boolean = true

  //ctx: SourceFunction.SourceContext[SensorReading]) 注意这个上下文ctx 一会用它把消息发出去ctx.collect 这样就一条一条将消息发出去了
  // (虽然我们生成数据是10条一起生成,但是还是一条一条发出去的 还是流数据)
  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 创建一个随机数发生器
    val rand = new Random()
    // 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
    var curTemp = 1.to(10).map(
      i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
    )

    // 无限循环生成流数据,除非被cancel
    while(running){
      // 更新温度值
      curTemp = curTemp.map(
        t => (t._1, t._2 + rand.nextGaussian())  //nextGaussian下一个高斯数 高斯分布即正态分布
      )
      // 获取当前的时间戳
      val curTime = System.currentTimeMillis()
      // 包装成SensorReading,输出
      curTemp.foreach(
        t => ctx.collect( SensorReading(t._1, curTime, t._2))
      )
      // 间隔100ms
      Thread.sleep(100)
    }
  }

  override def cancel(): Unit = running = false
}

Flink之自定义Source

相关标签: flink