Flink source之与kafka以及exactly-once
程序员文章站
2022-06-16 16:42:02
...
flink的source
flink的source也就是源大部分应该都是kafka的数据
Flink+kafka
api
pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.0</version>
</dependency>
//自定义一个kafka工具类,免得每次都要写一遍重复代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
object MyKafkaUtil {
val prop = new Properties()
prop.setProperty("bootstrap.servers","192.168.199.100:9092")
prop.setProperty("group.id","flinkdemo")
def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
myKafkaConsumer
}
}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object WordCountStream {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.socketTextStream("192.168.199.100",9999)
//如果流过来时候输入空格,可能会有空字符 filter(_.nonEmpty)过滤掉空字符
import org.apache.flink.api.scala._
val word2CountDataStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)
word2CountDataStream.print()
env.execute()
}
}
如何保证exactly-once语义的
Flink通过checkpoint来保存数据是否处理完成的状态
由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。