Flink之自定义Source
程序员文章站
2022-06-16 12:14:40
...
主函数
有4中读取数据方式
1.从集合中读取数据env.fromCollection(List(…))
2.从文件中读取数据env.readTextFile(path)
3.从kafka中读取数据 env.addSource(new FlinkKafkaConsumer[String](“sensor”, new SimpleStringSchema(), properties))
4. 第四种就是自定义source
val stream4 = env.addSource(new SensorSource())
package com.flink.sourceAndSink
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import scala.util.Random
/**
* Created by Shi shuai RollerQing on 2019/12/19 9:09
*/
// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//1.从集合中读取数据
val stream1 = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
//2.从文件中读取数据
val stream2 = env.readTextFile("C:\\Users\\HP\\IdeaProjects\\sparkCore\\flink\\src\\main\\resources\\sensor.txt")
//3.从kafka中读取数据
//创建kafka相关配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop01:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
//val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
/***
* 4. 自定义数据源
*
*/
val stream4 = env.addSource(new SensorSource())
//sink输出
stream4.print("stream4")
env.execute("source api test")
}
}
自定义Source
class SensorSource() extends SourceFunction[SensorReading]{
//定义一个flag:表示数据源是否还在正常运行
var running: Boolean = true
//ctx: SourceFunction.SourceContext[SensorReading]) 注意这个上下文ctx 一会用它把消息发出去ctx.collect 这样就一条一条将消息发出去了
// (虽然我们生成数据是10条一起生成,但是还是一条一条发出去的 还是流数据)
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 创建一个随机数发生器
val rand = new Random()
// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
var curTemp = 1.to(10).map(
i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
)
// 无限循环生成流数据,除非被cancel
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian()) //nextGaussian下一个高斯数 高斯分布即正态分布
)
// 获取当前的时间戳
val curTime = System.currentTimeMillis()
// 包装成SensorReading,输出
curTemp.foreach(
t => ctx.collect( SensorReading(t._1, curTime, t._2))
)
// 间隔100ms
Thread.sleep(100)
}
}
override def cancel(): Unit = running = false
}
上一篇: 请教有没有较好的调试跟踪软件
下一篇: XML的DOM和SAX解析方式