欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink

程序员文章站 2022-03-14 19:27:56
...

Apache Flink

概述

Flink 是构建在数据流之上的一款有状态的流计算框架,通常被人们称为第三代大数据分析方案

第一代大数据处理方案:基于Hadoop的MapReduce 静态批处理 | Storm 实时流计算 ,两套独立的计算引擎,难度大(2014年9月

第二代大数据处理方案:Spark RDD 静态批处理、Spark Streaming(DStream)实时流计算(实时性差),统一的计算引擎,难度小(2014年2月

第三代大数据分析方案:Flink DataSet 批处理框架、Apache Flink DataStream 流处理框架(2014年12月

可以看出Spark和Flink几乎同时诞生,但是Flink之所以成为第三代大数据处理方案,主要是因为早期人们对大数据分析的认知不够深刻或者业务场景大都局限在批处理领域,从而导致了Flink的发展相比于Spark较为缓慢,直到2017年人们才开始慢慢将批处理转向流处理

更多介绍:https://blog.csdn.net/weixin_38231448/article/details/100062961

流计算场景:实时计算领域、系统监控、舆情监控、交通预测、国家电网、疾病预测、银行/金融风控等领域

Spark VS Flink

Flink

Flink的核心是一个流式的数据流执行引擎,针对数据流的分布式计算,它提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务,例如:

DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python语言。

DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala语言。

Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala语言。

此外,Flink还针对特定的应用领域提供了领域库,例如:

Flink ML,Flink的机器学习库,提供了机器学习Pipelines API,并实现了多种机器学习算法。

Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算的算法实现。

Flink 架构

Flink概念

Tasks and Operator Chains(阶段划分)

对于Flink分布式任务的执行,Flink尝试根据任务计算的并行度,将若干个操作符连接成一个任务Task(相当于Spark框架中的阶段-Stage),一个Flink计算任务通常会被拆分成若干个Task(阶段),每一个Task都有自己的并行度,每一个并行度表示一个线程(SubTask)。

Flink

  • Task等价于Spark任务中的Stage

  • Operator Chain,Flink通过Operator Chain方式实现Task划分,有点类似于Spark的宽窄依赖,Operator Chain方式有两种:forward、hash | rebalance

Job Managers、Task Managers、Clients

JobManagers(Master) - 协调并行计算任务,负责调度Task、协调CheckPoint以及故障恢复,它等价于Spark中的Master+Driver

There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.

TaskManagers(Slaves)- 真正负责Task划分的执行节点(执行SubTask或线程),同时需要向JobManagers汇报节点状态以及工作负荷。

Clients - 与Spark不同,Client并不是集群计算的一部分,它只负责将任务Dataflow(类似Spark DAG图)提交给JobManager,任务提交完成可以退出,而Spark中的Client被称为Driver,负责生产DAG并且监控整个任务的执行过程和故障恢复。

Flink

Task Slots and Resources

每个Worker(TaskManager)是一个JVM进程,可以执行一个或多个子任务(Thread或SubTask),为了控制Woker能够接受多少个任务,Woker具有所谓的Task Slot(至少一个Task Slot)。

每个Task Slot代表TaskManager资源的固定子集。例如具有3个Task Slot的TaskManager,则每个Task Slot表示占用当前TaskManager进程1/3的内存,每个Job在启动时都有自己的Task Slot,数目固定,这样通过Task Slot的划分就可以避免不同Job的SubTask之间竞争内存资源,以下表示一个Job获取6个Task Slot,但是仅仅只有5个线程,3个Task。

Flink

在默认情况下,来自同一个Job的不同Task(阶段)的SubTask可以共享一个Task Slot,Job计算所需Task Slot的个数由Task中的最大并行度所决定。

  • Flink集群所需的任务槽与作业中使用的最高并行度恰好一样多。
  • 更容易获得更好的资源利用率。如果没有Task Slot共享,则非密集型source子任务将阻塞与资源密集型window子任务一样多的资源,通过Task Slot共享可以将任务并行度由2增加到6,从而得到如下资源分配:

Flink

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html

Flink基础环境

前提条件

  • JDK1.8+安装完成
  • HDFS正常启动(SSH免密认证)

Flink安装

  • 上传并解压flink
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
  • 配置flink-conf.yaml
[aaa@qq.com ~]# vi /usr/flink-1.8.1/conf/flink-conf.yaml
jobmanager.rpc.address: centos
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
  • 配置slaves
[aaa@qq.com ~]# vi /usr/flink-1.8.1/conf/slaves
centos
  • 启动Flink
[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host centos.
Starting taskexecutor daemon on host centos.
[aaa@qq.com flink-1.8.1]# jps
2912 Jps
2841 TaskManagerRunner
2397 StandaloneSessionClusterEntrypoint

访问:http://centos:8081

Flink

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html

快速入门

  • 引入依赖
<properties>
    <flink.version>1.8.1</flink.version>
    <scala.version>2.11</scala.version>
</properties>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • Client程序
//1.创建流处理的环境  -远程发布|本地执行
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.读取外围系统数据 -细化
val lines:DataStream[String]=env.socketTextStream("centos",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
// print(env.getExecutionPlan)
//3.执行流计算任务
env.execute("wordcount")
  • 将程序打包
[aaa@qq.com flink-1.8.1]# ./bin/flink run --class com.baizhi.demo01.FlinkWordCounts --detached --parallelism 3 /root/original-flink-1.0-SNAPSHOT.jar
Starting execution of program
Job has been submitted with JobID 221d5fa916523f88741e2abf39453b81
[aaa@qq.com flink-1.8.1]#
[aaa@qq.com flink-1.8.1]# ./bin/flink list -m centos:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
14.10.2019 17:15:31 : 221d5fa916523f88741e2abf39453b81 : wordcount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
  • 取消任务
[aaa@qq.com flink-1.8.1]# ./bin/flink cancel -m centos:8081 221d5fa916523f88741e2abf39453b81
Cancelling job 221d5fa916523f88741e2abf39453b81.
Cancelled job 221d5fa916523f88741e2abf39453b81.

程序部署方式

  • 脚本
[aaa@qq.com flink-1.8.1]# ./bin/flink run 
                            --class com.baizhi.demo01.FlinkWordCounts 
                            --detached //后台运行
                            --parallelism 3  //指定并行度
                            /root/original-flink-1.0-SNAPSHOT.jar
  • UI页面
    Flink

  • 跨平台

val jarFiles="flink\\target\\original-flink-1.0-SNAPSHOT.jar" //测试
val env = StreamExecutionEnvironment.createRemoteEnvironment("centos",8081,jarFiles)
  • 本地模拟
val env = StreamExecutionEnvironment.createLocalEnvironment(3)
或者
val env = StreamExecutionEnvironment.getExecutionEnvironment //自动识别运行环境,一般用于生产

DataStream API

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html

Data Sources

Source是程序读取其输入的位置,您可以使用env.addSource(sourceFunction)将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定义Source,或者通过继承RichParallelSourceFunction或实现ParallelSourceFunction接口来实现并行Source.

File-based

readTextFile(path) - 逐行读取文本文件,底层使用TextInputFormat规范读取文件,并将其作为字符串返回

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.readTextFile("file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(仅仅读取一次,类似批处理)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(inputFormat,"file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个内部调用的方法。它根据给定的FileInputFormat读取指定路径下的文件,可以根据watchType定期检测指定路径下的文件,其中watchType的可选值为FileProcessingMode.PROCESS_CONTINUOUSLY或者FileProcessingMode.PROCESS_ONCE,检查的周期由interval参数决定。用户可以使用pathFilter参数排除该路径下需要排除的文件。如果指定watchType的值被设置为PROCESS_CONTINUOUSLY,表示一旦文件内容发生改变,整个文件内容会被重复处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(
    inputFormat,"file:///E:\\demo\\words",
    FileProcessingMode.PROCESS_CONTINUOUSLY,
    5000,new FilePathFilter {
        override def filterPath(filePath: Path): Boolean = {
            filePath.getPath.endsWith(".txt")
        }
    })
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Socket-based

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.socketTextStream("centos",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Collection-based(测试)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.fromCollection(List("this is a demo","good good"))
//val lines:DataStream[String]=env.fromElements("this is a demo","good good")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Custom Source

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import scala.util.Random

class CustomSourceFunction extends ParallelSourceFunction[String]{
  @volatile
  var isRunning:Boolean = true
  val lines:Array[String] = Array("this is a demo","hello word","are you ok")
    
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while(isRunning){
      Thread.sleep(1000)
      ctx.collect(lines(new Random().nextInt(lines.length)))//将数据输出给下游
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.addSource[String](new CustomSourceFunction)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

FlinkKafkaConsumer√

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines=env.addSource(new FlinkKafkaConsumer("topic01",new SimpleStringSchema(),props))
    lines.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(t=>t._1)
    .sum(1)
    .print()
env.execute("wordcount")

如果使用SimpleStringSchema,仅仅能获取value,如果用户希望获取更多信息,比如 key/value/partition/offset ,用户可以通过继承KafkaDeserializationSchema类自定义反序列化对象。

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.streaming.api.scala._

class UserKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String)] {
  //这个方法永远返回false
  override def isEndOfStream(nextElement: (String, String)): Boolean = {
    false
  }
    
  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
    var key=""
    if(record.key()!=null && record.key().size!=0){
      key=new String(record.key())
    }
    val value=new String(record.value())
    (key,value)
  }
    
  //告诉Flink tuple元素类型
  override def getProducedType: TypeInformation[(String, String)] = {
    createTypeInformation[(String, String)]
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines:DataStream[(String,String)]=env.addSource(new FlinkKafkaConsumer("topic01",new UserKafkaDeserializationSchema(),props))
lines.map(t=>t._2).flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

如果Kafka存储的都是json格式的字符串数据,用户可以使用系统自带的一些支持json的Schema,推荐使用:

  • JsonNodeDeserializationSchema:要求value必须是json格式的字符串
  • JSONKeyValueDeserializationSchema(meta):要求key、value都必须是josn格式数据,同时可以携带元数据(分区、 offset等)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jsonData:DataStream[ObjectNode]=env.addSource(new FlinkKafkaConsumer("topic01",new JSONKeyValueDeserializationSchema(true),props))
jsonData.map(on=> (on.get("value").get("id").asInt(),on.get("value").get("name")))
.print()
env.execute("wordcount")

Data Sinks

Data Sinks接收DataStream数据,并将其转发到指定文件,socket,外部存储系统或者print它们,Flink预定义一些输出Sink。

File-based

write*:writeAsText|writeAsCsv(…)|writeUsingOutputFormat,请注意DataStream上的write*()方法主要用于调试目的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.writeAsText("file:///E:/results/text",WriteMode.OVERWRITE)
env.execute("wordcount")

以上写法只能保证at_least_once的语义处理,如果是在生产环境下,推荐使用flink-connector-filesystem将数据写到外围系统,可以保证exactly-once语义处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://centos:9000/BucketingSink")
bucketingSink.setBucketer(new DateTimeBucketer("yyyyMMddHH"))//文件目录
bucketingSink.setBatchSize(1024)
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(bucketingSink)
.setParallelism(6)
env.execute("wordcount")

print() | printToErr()

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("测试") //输出前缀  当有多个流输出到控制台时,可以添加前缀加以区分
.setParallelism(2)
env.execute("wordcount")

Custom Sink

class  CustomSinkFunction extends  RichSinkFunction[(String,Int)]{
  override def open(parameters: Configuration): Unit = {
    println("初始化连接")
  }
    
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println(value)
  }
    
  override def close(): Unit = {
    println("关闭连接")
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new CustomSinkFunction)
env.execute("wordcount")

RedisSink√

  • 添加
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
class UserRedisMapper extends RedisMapper[(String,Int)]{
  // 设置数据类型
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"wordcount")
  }

  override def getKeyFromData(data: (String, Int)): String = {
    data._1
  }

  override def getValueFromData(data: (String, Int)): String = {
    data._2.toString
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jedisConfig=new FlinkJedisPoolConfig.Builder()
.setHost("centos")
.setPort(6379)
.build()
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new RedisSink[(String, Int)](jedisConfig,new UserRedisMapper))
env.execute("wordcount")

FlinkKafkaProducer√

class UserKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
Int

  override def serializeKey(element: (String, Int)): Array[Byte] = {
    element._1.getBytes()
  }

  override def serializeValue(element: (String, Int)): Array[Byte] = {
    element._2.toString.getBytes()
  }

  //可以覆盖 默认是topic,如果返回值为null,表示将数据写入到默认的topic中
  override def getTargetTopic(element: (String, Int)): String = {
    null
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props1 = new Properties()
props1.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props1.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")
val props2 = new Properties()
props2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props2.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")
props2.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")
props2.setProperty(ProducerConfig.ACKS_CONFIG,"all")
props2.setProperty(ProducerConfig.RETRIES_CONFIG,"2")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props1))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer[(String, Int)]("topic02",new UserKeyedSerializationSchema,props2))
env.execute("wordcount")

DataStream Transformations

Map

Takes one element and produces one element.

dataStream.map { x => x * 2 }

FlatMap

Takes one element and produces zero, one, or more elements.

dataStream.flatMap { str => str.split(" ") }

Filter

Evaluates a boolean function for each element and retains those for which the function returns true.

dataStream.filter { _ != 0 }

Union

Union of two or more data streams creating a new stream containing all the elements from all the streams.

dataStream.union(otherStream1, otherStream2, ...)

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

val stream1 = env.socketTextStream("centos",9999)
val stream2 = env.socketTextStream("centos",8888)
stream1.connect(stream2).flatMap(line=>line.split("\\s+"),line=>line.split("\\s+"))
.map(Word(_,1))
.keyBy("word")
.sum("count")
.print()

Split

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)               

Select

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
val lines = env.socketTextStream("centos",9999)
val splitStream: SplitStream[String] = lines.split(line => {
    if (line.contains("error")) {
        List("error") //分支名称
    } else {
        List("info") //分支名称
    }
})
splitStream.select("error").print("error")
splitStream.select("info").print("info")

Side Out

val lines = env.socketTextStream("centos",9999)
//设置边输出标签 
val outTag = new OutputTag[String]("error") 
val results = lines.process(new ProcessFunction[String, String] {
    override def processElement(value: String, ctx: ProcessFunction[String,                 String]#Context, out: Collector[String]): Unit = {
        if (value.contains("error")) {
            ctx.output(outTag, value)
        } else {
            out.collect(value)
        }
    }
})
results.print("正常结果")
//获取边输出
results.getSideOutput(outTag)
.print("错误结果")

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
.print()

Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.fold(("",0))((t1,t2)=>(t2._1,t1._2+t2._2))
.print()

Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

zs 001 1200
ww 001 1500
zl 001 1000
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.minBy(2)//输出含有最小值的记录
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zl,001,1000.0)
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.min(2)
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zs,001,1000.0)

State 和 Fault Tolerance(重点)

有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位:

  • 记录数据从某一个过去时间点到当前时间的状态信息。
  • 以每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。
  • 在训练机器学习模型时,状态将保持当前版本的模型参数。

Flink在管理状态方面,使用Checkpoint和Savepoint实现状态容错。Flink的状态在计算规模发生变化的时候,可以自动在并行实例间实现状态的重新分发,底层使用State Backend策略存储计算状态,State Backend决定了状态存储的方式和位置(后续章节介绍)。

Flink在状态管理中将所有能操作的状态分为Keyed StateOperator State,其中Keyed State类型的状态同key一一绑定,并且只能在KeyedStream中使用。所有non-KeyedStream状态操作都叫做Operator State。Flink在底层做状态管理时,将Keyed State和<parallel-operator-instance, key>关联,由于某一个key仅仅落入其中一个operator-instance中,因此可以简单的理解Keyed State是和<operator,key>进行绑定的,采用Key Group机制对Keyed State进行管理或者分类,所有的keyed-operator在做状态操作的时候可能需要和1~n个Key Group进行交互。

Flink在分发Keyed State状态的时候,不是以key为单位,而是以Key Group为最小单元分发

Operator State (也称为 non-keyed state),每一个operator state 会和一个parallel operator instance进行绑定。Keyed StateOperator State 以两种形式存在( managed(管理)和 raw(原生)),所有Flink已知的操作符都支持Managed State,但是Raw State仅仅在用户自定义Operator时使用,并且不支持在并行度发生变化的时候重新分发状态,因此,虽然Flink支持Raw State,但是在绝大多数的应用场景下,一般使用的都是Managed State。

Keyed State

Keyed-state接口提供对不同类型状态的访问,所有状态都限于当前输入元素的key。

类型 说明 方法
ValueState 这个状态主要存储一个可以用作更新的值 update(T)
T value()
clear()
ListState 这将存储List集合元素 add(T)
addAll(List)
Iterable get()
update(List)
clear()
ReducingState 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供ReduceFunction
add(T)
T get()
clear()
AggregatingState<IN, OUT> 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供AggregateFunction
add(IN)
T get()
clear()
FoldingState<T, ACC> 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供FoldFunction
add(IN)
T get()
clear()
MapState<UK, UV> 这个状态会保留一个Map集合元素 put(UK, UV)
putAll(Map<UK, UV>)
entries()
keys()
values()
clear()

ValueSate

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])
        vs=getRuntimeContext.getState[Int](vsd)
    }
    
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")

AggregatingState<IN, OUT>

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toInt))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Double)] {
    var vs:AggregatingState[Int,Double]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double] {
            override def createAccumulator(): (Double, Int) = {
                (0.0,0)
            }

            override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {
                (accumulator._1+value,accumulator._2+1)
            }
            
            override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {
                (a._1+b._1,a._2+b._2)
            }
            
            override def getResult(accumulator: (Double, Int)): Double = {
                accumulator._1/accumulator._2
            }
        },createTypeInformation[(Double,Int)])
        vs=getRuntimeContext.getAggregatingState(vsd)
    }
    override def map(value: (String, Int)): (String, Double) = {
        vs.add(value._2)
        val avgCount=vs.get()
        (value._1,avgCount)
    }
}).print()
env.execute("wordcount")

MapState<UK, UV>

var env=StreamExecutionEnvironment.getExecutionEnvironment
//001 zs 202.15.10.12 日本 2019-10-10
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
.keyBy("id","name")
.map(new RichMapFunction[Login,String] {
    var vs:MapState[String,String]=_
    override def open(parameters: Configuration): Unit = {
        val msd=new MapStateDescriptor[String,String]("mapstate",createTypeInformation[String],createTypeInformation[String])
        vs=getRuntimeContext.getMapState(msd)
    }
    override def map(value: Login): String = {
        println("历史登录")
        for(k<- vs.keys().asScala){
            println(k+" "+vs.get(k))
        }
        var result=""
        if(vs.keys().iterator().asScala.isEmpty){
            result="ok"
        }else{
            if(!value.city.equalsIgnoreCase(vs.get("city"))){
                result="error"
            }else{
                result="ok"
            }
        }
        vs.put("ip",value.ip)
        vs.put("city",value.city)
        vs.put("loginTime",value.loginTime)
        result
    }
}).print()
env.execute("wordcount")

总结

new Rich[Map|FaltMap]Function {
    var vs:XxxState=_ //状态声明
    override def open(parameters: Configuration): Unit = {
        val xxd=new XxxStateDescription //完成状态的初始化
        vs=getRuntimeContext.getXxxState(xxd)
    }
    override def xxx(value: Xx): Xxx = {
       //状态操作
    }
}
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

State Time-To-Live(TTL)

基本使用

可以将state存活时间(TTL)分配给任何类型的keyed-state,如果配置了TTL且状态值已过期,则Flink将尽力清除存储的历史状态值。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
  • 案例
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])

        val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //过期时间5s
        .setUpdateType(UpdateType.OnCreateAndWrite)//创建和修改的时候更新过期时间
        .setStateVisibility(StateVisibility.NeverReturnExpired)//永不返回过期的数据
        .build()

        vsd.enableTimeToLive(ttlConfig)

        vs=getRuntimeContext.getState[Int](vsd)
    }
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")

注意:开启TTL之后,系统会额外消耗内存存储时间戳(Processing Time),如果用户以前没有开启TTL配置,在启动之前修改代码开启了TTL,在做状态恢复的时候系统启动不起来,会抛出兼容性失败以及StateMigrationException的异常。

清除Expired State

在默认情况下,仅当明确读出过期状态时,通过调用ValueState.value()方法才会清除过期的数据,这意味着,如果系统一直未读取过期的状态,则不会将其删除,可能会导致存储状态数据的文件持续增长。

Cleanup in full snapshot

系统会从上一次状态恢复的时间点,加载所有的State快照,在加载过程中会剔除那些过期的数据,这并不会影响磁盘已存储的状态数据,该状态数据只会在Checkpoint的时候被覆盖,但是依然解决不了在运行时自动清除过期且没有用过的数据。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

只能用于memory或者snapshot状态的后端实现,不支持RocksDB State Backend。

Cleanup in background

可以开启后台清除策略,根据State Backend采取默认的清除策略(不同状态的后端存储,清除策略不同)

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground
.build
  • Incremental cleanup(基于内存backend)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupIncrementally(100,true) //默认值 5 | false
.build()

第一个参数表示每一次触发cleanup的时候,系统会一次处理100个元素。第二个参数是false,表示只要用户对任意一个state进行操作,系统都会触发cleanup策略;第二个参数是true,表示只要系统接收到记录数(即使用户没有操作状态)就会触发cleanup策略。

  • RocksDB compaction

RocksDB是一个嵌入式的key-value存储,其中key和value是任意的字节流,底层进行异步压缩,会将key相同的数据进行compact(压缩),以减少state文件大小,但是并不对过期的state进行清理,因此可以通过配置compactFilter,让RocksDB在compact的时候对过期的state进行排除,RocksDB数据库的这种过滤特性,默认关闭,如果想要开启,可以在flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled:true 或者在应用程序的API里设置RocksDBStateBackend::enableTtlCompactionFilter。、
Flink

import org.apache.flink.api.common.state.StateTtlConfig 
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) //默认配置1000
.build()

这里的1000表示,系统在做Compact的时候,会检查1000个元素是否失效,如果失效,则清除该过期数据。

Operator State

如果用户想要使用Operator State,只需要实现通用的CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>,值得注意的是,目前的operator-state仅仅支持list-style风格的状态,要求所存储的状态必须是一个List,且其中的元素必须可以序列化。

CheckpointedFunction

提供两种不同的状态分发方案:Even-splitUnion

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候,系统会调用initializeState()

Even-split:表示系统在故障恢复时,会将operator-state的元素均分给所有的operator实例,每个operator实例将获取到整个operator-state的sub-list数据。

Union:表示系统在故障恢复时,每一个operator实例可以获取到整个operator-state的全部数据。

案例

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {
    var listState:ListState[(String,Int)]=_
    val bufferedElements = ListBuffer[(String, Int)]()
    //负责将数据输出到外围系统
    override def invoke(value: (String, Int)): Unit = {
        bufferedElements += value
        if(bufferedElements.size == threshold){
            for(ele <- bufferedElements){
                println(ele)
            }
            bufferedElements.clear()
        }
    }
    //是在savepoint|checkpoint时候将数据持久化
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        listState.clear()
        for(ele <- bufferedElements){
            listState.add(ele)
        }
    }
    //状态恢复|初始化 创建状态
    override def initializeState(context: FunctionInitializationContext): Unit = {
        val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])
        listState=context.getOperatorStateStore.getListState(lsd)
        if(context.isRestored){
            for(element <- listState.get().asScala) {
                bufferedElements += element
            }
        }
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))
env.execute("testoperatorstate")
  • 启动netcat服务
[aaa@qq.com ~]# nc -lk 9999
  • 提交任务
    Flink

注意,将并行度设置为1,方便测试

  • 在netcat中输入以下数据
[aaa@qq.com ~]# nc -lk 9999
a1 b1 c1 d1
  • 取消任务,并且创建savepoint
[aaa@qq.com flink-1.8.1]# ./bin/flink list -m centos:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[aaa@qq.com flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.

注意,如果Flink需要和Hadoop整合,必须保证在当前环境变量下有HADOOP_HOME|HADOOP_CALSSPATH

[aaa@qq.com flink-1.8.1]# cat /root/.bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
  • 测试状态
    Flink

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction接口的一种变体形式,仅仅支持Even-split状态的分发策略。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • restoreState():等价于上述CheckpointedFunction中声明的initializeState()方法,用作状态恢复

案例

import java.lang.{Long => JLong} //修改类别名
import scala.{Long => SLong} //修改类别名
class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{
  @volatile
  var isRunning:Boolean = true
  var offset = 0L
    
  override def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {
    val lock = ctx.getCheckpointLock
    while(isRunning){
       Thread.sleep(1000)
       lock.synchronized({
         ctx.collect(offset)
         offset += 1
       })
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {
    Collections.singletonList(offset) //存储的是 当前source的偏移量,如果状态不可拆分,用户可以使Collections.singletonList
  }

  override def restoreState(state: util.List[JLong]): Unit = {
    for (s <- state.asScala) {
      offset = s
    }
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.addSource[Long](new CustomStatefulSourceFunction)
.print("offset:")
env.execute("testOffset")

广播状态

支持Operator State的第三种类型是广播状态。引入广播状态以支持用例,其中需要将来自一个流的某些数据广播到所有下游任务,广播的状态将存储在本地,用于处理另一个流上所有传入的元素。

A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.

non-keyed

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{
    //处理的是UserBuyPath,读取广播状态
    override def processElement(value: UserBuyPath,
                                ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext,
                                out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        if(broadcastState.contains(value.channel)){//如果有规则,尝试计算
            val threshold= broadcastState.get(value.channel)
            if(value.path >= threshold){//将满足条件的用户信息输出
                out.collect(value.id+" "+value.name+" "+value.channel+" "+value.path)
            }
        }
    }
    //处理的是规则 Rule 数据 ,记录修改广播状态
    override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context,
                                         out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        broadcastState.put(value.channel,value.threshold)//更新状态

        println("=======rule======")
        for(entry <- broadcastState.entries().asScala){
            println(entry.getKey+"\t"+entry.getValue)
        }
        println()
        println()
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userStream = fsEnv.socketTextStream("centos", 9999)
    .map(line => line.split("\\s+"))
    .map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
    .keyBy("id", "name")
    .map(new UserActionRichMapFunction)

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类   10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
    .map(line => line.split("\\s+"))
    .map(ts => Rule(ts(0), ts(1).toInt))
    .broadcast(msd)

userStream.connect(broadcastStream)
.process(new UserBuyPathBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
case class UserBuyPath(id:String,name:String,channel:String,path:Int)
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{
  var buyPathState:MapState[String,Int]=_
  override def open(parameters: Configuration): Unit = {
   val msd= new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])
   buyPathState=getRuntimeContext.getMapState(msd)
  }
  override def map(value: UserAction): UserBuyPath = {
    val channel = value.channel
    var path=0
    if(buyPathState.contains(channel)){
      path=buyPathState.get(channel)
    }
    if(value.action.equals("buy")){
      buyPathState.remove(channel)
    }else{
      buyPathState.put(channel,path+1)
    }
    UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))
  }
}

**keyed **

class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{
  override def processElement(value: UserAction,
                              ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    println("value:"+value +" key:"+ctx.getCurrentKey)
    println("=====state======")
    for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){
      println(entry.getKey+"\t"+entry.getValue)
    }
  }

  override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {
     println("Rule:"+value)
    //更新状态
    ctx.getBroadcastState(msd).put(value.channel,value.threshold)
  }
}
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userKeyedStream = env.socketTextStream("centos", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以写一个参数

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类 10
// 电子类 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)
userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")

CheckPoint & SavePoint

CheckPoint是Flink实现故障容错的一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序在计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。

SavePoint是一种有效的运维手段,需要用户手动触发程序进行状态备份,本质也是在做CheckPoint。

实现故障恢复的先决条件:

  • 持久的数据源,可以在一定时间内重播记录(例如,FlinkKafkaConsumer)
  • 状态的永久性存储,通常是分布式文件系统(例如,HDFS)
var env=StreamExecutionEnvironment.getExecutionEnvironment
//启动检查点机制
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止
env.getCheckpointConfig.setCheckpointTimeout(2000)
//设置checkpoint之间时间间隔 <=  Checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默认1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦检查点不能正常运行,Task也将终止
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.print()
env.execute("testoperatorstate")

State Backend

State Backend决定Flink如何存储系统状态信息(Checkpoint形式),目前Flink提供了三种State Backend实现。

  • Memory (JobManagwer):这是Flink的默认实现,通常用于测试,系统会将计算状态存储在JobManagwer的内存中,但是在实际的生产环境中,由于计算的状态比较多,使用Memory 很容易导致OOM(out of memory)。
  • FileSystem:系统会将计算状态存储在TaskManager的内存中,因此一般用作生产环境,系统会根据CheckPoin机制,将TaskManager状态数据在文件系统上进行备份。如果是超大规模集群,TaskManager内存也可能发生溢出。
  • RocksDB:系统会将计算状态存储在TaskManager的内存中,如果TaskManager内存不够,系统可以使用RocksDB配置本地磁盘完成状态的管理,同时支持将本地的状态数据备份到远程文件系统,因此,RocksDB Backend 是推荐的选择。

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

每一个Job都可以配置自己状态存储的后端实现

var env=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
env.setStateBackend(fsStateBackend)

如果用户不配置,则系统使用默认实现,默认实现可以通过修改flink-conf-yaml文件进行配置

[aaa@qq.com ~]# cd /usr/flink-1.8.1/
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
 state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
 state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#
 state.savepoints.dir: hdfs:///flink-savepoints
 
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
 state.backend.incremental: true

注意,必须在环境变量中出现HADOOP_CLASSPATH

### Flink计算发布之后是否还能够修改计算算子?

首先,这在Spark中是不允许的,因为Spark会持久化代码片段,一旦修改代码,必须删除Checkpoint,但是Flink仅仅存储各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作算子上指定uid属性。

env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.uid("kakfa-consumer")
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.uid("word-count") //唯一
.map(t=>t._1+"->"+t._2)
.print()

Flink Kafka如何保证精准一次的语义操作?

Windows(窗口)

Windows是流计算的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算(ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction)等。在Flink中编写一个窗口计算的基本结构如下:

Keyed Windows

stream
    .keyBy(...)                
    .window(...)               <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

Non-Keyed Windows

stream
    .windowAll(...)            <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

Window Lifecycle(生命周期)

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners).

in addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied.

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Window Assigners(窗口分配器)

The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyedstreams) or the windowAll() (for non-keyed streams) call.

A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time.

Tumbling Windows

滚动窗口长度固定,滑动间隔等于窗口长度,窗口元素之间没有交叠。
Flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

Sliding Windows

滑动窗口长度固定,窗口长度大于窗口滑动间隔,元素存在交叠。
Flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e <- elements){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Session Windows(MergerWindow)

通过计算元素时间间隔,如果间隔小于session gap,则会合并到一个窗口中;如果大于时间间隔,当前窗口关闭,后续的元素属于新的窗口。与滚动窗口和滑动窗口不同的是会话窗口没有固定的窗口大小,底层本质上做的是窗口合并。
Flink

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.apply(new WindowFunction[(String,Int),String,String,TimeWindow]{
    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Global Windows

全局窗口会将所有key相同的元素放到一个窗口中,默认该窗口永远都不会关闭(永远都不会触发),因为该窗口没有默认的窗口触发器Trigger,因此需要用户自定义Trigger。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BMG8uOAV-1573314364962)(assets/1571368392929.png)]

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of[GlobalWindow](3))
.apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{
    override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        println("=======window========")
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Window Functions

当系统认定窗口就绪之后会调用Window Functions对窗口实现聚合计算。常见的Window Functions有以下形式: ReduceFunction, AggregateFunction, FoldFunction 或者ProcessWindowFunction|WindowFunction(古董|旧版)

ReduceFunction

class SumReduceFunction extends ReduceFunction[(String,Int)]{
  override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
    (v1._1,v1._2+v2._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new SumReduceFunction)// .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

AggregateFunction

class SumAggregateFunction extends AggregateFunction[(String,Int),(String,Int),(String,Int)]{
  override def createAccumulator(): (String,Int) = {
    ("",0)
  }
  override def merge(a: (String,Int), b: (String,Int)): (String,Int) = {
    (a._1,a._2+b._2)
  }
  override def add(value: (String, Int), accumulator: (String,Int)): (String,Int) = {
    (value._1,accumulator._2+value._2)
  }
  override def getResult(accumulator: (String,Int)): (String, Int) = {
    accumulator
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("CentOS",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new SumAggregateFunction)
.print()
env.execute("window")

FoldFunction

class SumFoldFunction  extends  FoldFunction[(String,Int),(String,Long)]{
  override def fold(accumulator: (String, Long), value: (String, Int)): (String, Long) = {
    (value._1,accumulator._2+value._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",8877)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//.fold(("",0L),new SumFoldFunction)
.fold(("",0L))((acc,v)=>(v._1,acc._2+v._2))
.print()
env.execute("window")

ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String,Int)]): Unit = {
        val results = elements.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
        out.collect(results)
    }
}).print()
env.execute("window")

globalState() | windowState()

  • globalState(), which allows access to keyed state that is not scoped to a window
  • windowState(), which allows access to keyed state that is also scoped to the window
var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    var wvds: ValueStateDescriptor[Int] = _
    var gvds: ValueStateDescriptor[Int] = _

    override def open(parameters: Configuration): Unit = {
        wvds = new ValueStateDescriptor[Int]("window-value", createTypeInformation[Int])
        gvds = new ValueStateDescriptor[Int]("global-value", createTypeInformation[Int])
    }

    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        val ws = context.windowState.getState(wvds)
        val gs=context.globalState.getState(gvds)
        val historyWindowValue = ws.value()
        val historyGlobalValue = gs.value()
        out.collect((key, historyWindowValue + total))
        context.output(globalTag, (key, historyGlobalValue + total))
        ws.update(historyWindowValue + total)
        gs.update(historyGlobalValue + total)
    }
})
countsStream.print("窗口统计")
countsStream.getSideOutput(globalTag).print("全局输出")
env.execute("window")

ReduceFunction+ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.reduce(new SumReduceFunction,new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
})
countsStream.print("窗口统计")
countsStream.getSideOutput(globalTag).print("全局输出")
env.execute("window")
var env=StreamExecutionEnvironment.getExecutionEnvironment
val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.fold(("",0L),new SumFoldFunction,new ProcessWindowFunction[(String, Long), (String, Long), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[(String, Long)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
}).print()
env.execute("window")

WindowFunction(不常用)

遗产或古董,一般用ProcessWindowFunction替代。

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1) //不能按照position进行keyBy()
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new WindowFunction[(String,Int),(String,Int),String,TimeWindow] {
    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        out.collect((key,input.map(_._2).sum))
    }
}).print()
env.execute("window")

Triggers(触发器)

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

WindowAssigners 触发器
global window NeverTrigger
event-time window EventTimeTrigger
processing-time window ProcessingTimeTrigger

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

DeltaTrigger

var env=StreamExecutionEnvironment.getExecutionEnvironment

val deltaTrigger =  DeltaTrigger.of[(String,Double),GlobalWindow](2.0,new DeltaFunction[(String,Double)] {
    override def getDelta(oldDataPoint: (String, Double), newDataPoint: (String, Double)): Double = {
        newDataPoint._2-oldDataPoint._2
    }
},createTypeInformation[(String,Double)].createSerializer(env.getConfig))

env.socketTextStream("centos",7788)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble))
.keyBy(0)
.window(GlobalWindows.create())
.trigger(deltaTrigger)
.reduce((v1:(String,Double),v2:(String,Double))=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

evictor(剔出)

The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:

public interface Evictor<T, W extends Window> extends Serializable {
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

}

ErrorEvitor

class ErrorEvictor(isBefore:Boolean) extends Evictor[String,TimeWindow] {
    override def evictBefore(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }

    override def evictAfter(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(!isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }
    private def evictor(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit={
        val iterator = elements.iterator()
        while(iterator.hasNext){
            val it = iterator.next()
            if(it.getValue.contains("error")){//将 含有error数据剔出
                iterator.remove()
            }
        }
    }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",7788)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.evictor(new ErrorEvictor(true))
.apply(new AllWindowFunction[String,String,TimeWindow] {
    override def apply(window: TimeWindow, input: Iterable[String], out: Collector[String]): Unit = {
        for(e <- input){
            out.collect(e)
        }
        print()
    }
})
.print()

fsEnv.execute("window")

Event Time

Flink在做窗口计算的时候支持以下语义的window:Processing timeEvent timeIngestion time

Processing time:使用处理节点时间,计算窗口

Event time:使用事件产生时间,计算窗口- 精确

Ingestion time:数据进入到Flink的时间,一般是通过SourceFunction指定时间
Flink

默认Flink使用的是ProcessingTime ,因此一般情况下如果用户需要使用 Event time/Ingestion time需要设置时间属性

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//window  操作
fsEnv.execute("event time")

一旦设置基于EventTime处理,用户必须声明水位线的计算策略,系统需要给每一个流计算出水位线时间T,只有窗口的end time T’ < = watermarker(T)的时候,窗口才会被触发。在Flink当中需要用户实现水位线计算的方式,系统并不提供实现。触发水位线的计算方式有两种:①一种是基于定时Interval(推荐)、②通过记录触发,每来一条记录系统会立即更新水位线。

定时

class AccessLogAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def getCurrentWatermark: Watermark = {
    return  new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = {
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

基于记录

class AccessLogAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def checkAndGetNextWatermark(lastElement: AccessLog, extractedTimestamp: Long): Watermark = {
    new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = { 
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

Watermarker

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

迟到数据处理

Flink支持对迟到数据处理,如果watermaker - window end < allow late time 记录可以参与窗口计算,否则Flink将too late数据丢弃。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

fsEnv.execute("event time")

Flink默认对too late数据采取的是丢弃,如果用户想拿到过期的数据,可以使用sideout方式

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)

val lateTag = new OutputTag[AccessLog]("latetag")
//模块信息 时间
val keyedWindowStream=fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateTag)
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})

keyedWindowStream.print("正常:")
keyedWindowStream.getSideOutput(lateTag).print("too late:")

fsEnv.execute("event time")

当流中存在多个水位线,系统在计算的时候取最低。

Joining

Window Join

基本语法

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Tumbling Window Join

Flink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
                        .map(line=>line.split("\\s+"))
                        .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
                        .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
                        .setParallelism(1)

userStream.join(orderStream)
            .where(user=>user.id)
            .equalTo(orderItem=> orderItem.uid)
            .window(TumblingEventTimeWindows.of(Time.seconds(4)))
            .apply((u,o)=>{
                (u.id,u.name,o.name,o.price,o.ts)
            })
.print()

fsEnv.execute("FlinkStreamSlidingWindowJoin")

Sliding Window JoinFlink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
    .map(line=>line.split("\\s+"))
    .map(ts=>User(ts(0),ts(1),ts(2).toLong))
    .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
    .setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
    .map(line=>line.split("\\s+"))
    .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
    .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
    .setParallelism(1)

userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamTumblingWindowJoin")

Session Window Join

Flink

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Interval Join

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.
Flink

This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.id)


//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.uid)


userStream.intervalJoin(orderStream)
.between(Time.seconds(-1),Time.seconds(1))
.process(new ProcessJoinFunction[User,OrderItem,String]{
    override def processElement(left: User, right: OrderItem, ctx: ProcessJoinFunction[User, OrderItem, String]#Context, out: Collector[String]): Unit = {
        println(left+" \t"+right)
        out.collect(left.id+" "+left.name+" "+right.name+" "+ right.price+" "+right.ts)
    }
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Flink HA

The JobManager coordinates every Flink deployment. It is responsible for both scheduling and resource management.

By default, there is a single JobManager instance per Flink cluster. This creates a single point of failure (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the SPOF. You can configure high availability for both standalone and YARN clusters.

Standalone Cluster High Availability

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failureand programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.
Flink

搭建过程

先决条件(略)

  • 安装JDK
  • 安装HADOOP HDFS-HA
  • 安装Zookeeper

Flink环境构建

  • 配置HADOOP_CLASSPATH
[aaa@qq.com ~]# vi ![.b](https://img-blog.csdnimg.cn/20191110001643234.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3Byb2dyYW1tZXJEaW5nbA==,size_16,color_FFFFFF,t_70)ashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
[aaa@qq.com ~]# source .bashrc
[aaa@qq.com ~]# echo $HADOOP_CLASSPATH
/usr/hadoop-2.9.2/etc/hadoop:/usr/hadoop-2.9.2/share/hadoop/common/lib/*:/usr/hadoop-2.9.2/share/hadoop/common/*:/usr/hadoop-2.9.2/share/hadoop/hdfs:/usr/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/usr/hadoop-2.9.2/share/hadoop/hdfs/*:/usr/hadoop-2.9.2/share/hadoop/yarn/lib/*:/usr/hadoop-2.9.2/share/hadoop/yarn/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/*:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar
  • 上传Flink,配置Flink
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[aaa@qq.com ~]# cd /usr/flink-1.8.1
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true
[aaa@qq.com flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[aaa@qq.com flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

启动Flink集群

[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群启动完成后,查看JobManager任务的日志,在lead主机中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

测试HA

登陆获取leadership的节点,然后执行以下指令

[aaa@qq.com flink-1.8.1]# ./bin/jobmanager.sh stop

查看其它节点,按照上诉的测试方式,可以查找leadership日志输出的节点,该节点就是master节点。

r/hadoop-2.9.2/share/hadoop/yarn/lib/:/usr/hadoop-2.9.2/share/hadoop/yarn/:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/:/usr/hadoop-2.9.2/share/hadoop/mapreduce/:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar


- 上传Flink,配置Flink

```shell
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[aaa@qq.com ~]# cd /usr/flink-1.8.1
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true
[aaa@qq.com flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[aaa@qq.com flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

启动Flink集群

[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群启动完成后,查看JobManager任务的日志,在lead主机中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

测试HA

登陆获取leadership的节点,然后执行以下指令

[aaa@qq.com flink-1.8.1]# ./bin/jobmanager.sh stop

查看其它节点,按照上诉的测试方式,可以查找leadership日志输出的节点,该节点就是master节点。

相关标签: Flink