欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink source之与kafka以及exactly-once

程序员文章站 2022-06-16 16:42:02
...

flink的source

flink的source也就是源大部分应该都是kafka的数据

Flink+kafka

api

pom

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
//自定义一个kafka工具类,免得每次都要写一遍重复代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object MyKafkaUtil {
  val prop = new Properties()
  prop.setProperty("bootstrap.servers","192.168.199.100:9092")
  prop.setProperty("group.id","flinkdemo")

  def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
    val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
    myKafkaConsumer
  }
}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object WordCountStream {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = env.socketTextStream("192.168.199.100",9999)

    //如果流过来时候输入空格,可能会有空字符   filter(_.nonEmpty)过滤掉空字符
    import org.apache.flink.api.scala._
    val word2CountDataStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)

    word2CountDataStream.print()

    env.execute()
  }
}

如何保证exactly-once语义的

Flink通过checkpoint来保存数据是否处理完成的状态

由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。
Flink source之与kafka以及exactly-once

相关标签: # flink flink