欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink

程序员文章站 2022-06-17 12:52:27
...

Flink

概述

之前的流处理都是以微批的形式来处理流,延迟较高

Flink 是构建在数据流之上的有状态的计算框架(structure Streaming也有状态)第三代大数据分析方案

2013年7月Storm开始在Apache孵化,2014年9月Storm成为Apache*项目(发展缓慢)。成为了当时最为主流的实时流处理框架。Storm以其低延迟高吞吐以及精准一次语义的处理,迅速的在各大互联网公司得到推广和应用,例如 twitter、阿里巴巴、百度、爱奇艺等快速的普及和实践,尤其是阿里巴巴中基于Storm封装了JStorm分支,并使用Storm处理阿里巴巴旗下的淘宝电商中的绝大多数的流处理业务,JStorm 后来被阿里抛弃称为Strom2.0。早期的应对大数据分析场景主流的选择方案:静态批处理:MapReduce;实时流处理:Storm因此通常我们将Hadoop和Storm称为第一代大数据处理方案

09年Hadoop出现一些问题,MapReduce2诞生以yarn为计算资源管理。

Spark在2013年6月份开始在Apache孵化,2014年2月份正式成为Apache的*项目。Spark发展如此之快是因为Spark在计算层方面明显优于Hadoop的Map Reduce这磁盘迭代计算,把批和流 进行了统一,降低了流处理程序员的开发门槛。但是易用性只能为企业节省用人成本,但是解决不了实际的生产过程中对流计算框架的实时性要求。通常将Spark称为第二代大数据处理方案

2014 年 4 月,Stratosphere 的代码被复制并捐献给了 Apache 软件基金会,这个项目很快完成了孵化,并在 2014 年 12 月一跃成为 Apache 软件基金会的*项目。(华丽的背景才被大众所接受),在德语中,flink 一词表示快速和灵巧。

第一代大数据处理方案:Hadoop MapReduce 静态批处理|Strom实时流计算2014年9月,两套独立的计算引擎。

第二代大数据处理:Spark RDD 静态批处理、Spark Streaming(DSteam)2014年2月实时流计算(实时性差),统一的计算引擎、难度小

第三代大数据处理:Apache Flink DataStream流处理、Flink DataSet批处理框架(构建流之上的批)、2014年12月

可以看出Spark和Flink几乎同时诞生,但是Flink之所以成为大数据处理方案,原因是因为早期人们对大数据分析的认知或者业务场景大都停留在批处理领域。才导致了Flink的发展比较于Spark较为缓慢,直到2017年人们才慢慢将批处理开始转向流处理

更多介绍:https://blog.csdn.net/weixin_38231448/article/details/100062961

流计算场景(未来十年都是流计算的天下):实时计算领域、系统监控、舆论监控(特别多)、交通预测(国家管控,优化道路建设,路况)、国家电网(各个用点设备的用电状态)、疾病预测(根据空气质量,环境变化参数,病例历史数据预测)、银行/金融风控(18年金融出现大面积崩盘,就是因为金融监控不够,找到异动数据及时锁定账户)

Spark VS Flink
Flink
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

此外,Flink还针对特定的应用领域提供了领域库,例如:

Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。

Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。

IOT:现场监控设备

Flink流行的早Flink ML出现的晚,目前不如Spark ML

大数据的架构基本趋于稳定(基础设施Hadoop,资源管理基本都会跑在Yarn或者自己的计算资源之上,重点学习计算层,学习分析层的分支)

Flink 架构

Flink概念

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html

Tasks and Operator Chains(阶段划分)

对于Flink分布式执行,Flink尝试更具任务计算并行度,将若干个操作符连接成一个任务Task(阶段-stage),一个Flink的计算任务通常会被拆分成若个Task,每一个Task都有自己任务并行度,每个并行度表示一个线程-subtask。
Flink

  • Task 等价于 Spark 任务中stage
  • Operator Chain Flink通过Operator Chain方式实现Task划分,有点类似Spark 宽窄依赖。Operator Chain方式有两种forward、hash|rebalance

Job Managers, Task Managers, Clients

JobManagers(Master) - 协调并行计算任务。负责调度Tasks、协调checklpoint以及故障恢复,等价Spark Master+Driver

Spark的master负责计算资源的分配,driver负责任务调度

There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.

总有至少一个Job Manager。高可用集群设置将有多个 JobManagers,其中一个始终是leader,其他的始终是standby

TaskManagers(Slaves) - 真正负责Task执行节点(执行subtask|线程),同时需要向JobManagers汇报节点状态以及工作负荷。

client - 和Spark不同,Client并不是集群计算的一部分,只负责任务提交dataflow(类似Spark DAG图)给JobManager。提交完成后可以退出。与spark的client称为Driver,负责生产DAG并且监控整个任务的执行和故障恢复。
Flink

Task Slots and Resources

Spark是totalexcutorcores Flink是slot

Flink中同一个Job下允许不同阶段的子任务可以共享同一个槽,不同job的子任务不能共享同一个槽

Flink只能通过槽划分内存,Yarn(MapReduce)是进程可以划分CPU的使用

每个Worker(TaskManager)是一个JVM进程,可以执行一个或多个子任务(Thread|subtask)。为了控制woker接受多少个任务,Woker具有所谓的Task Slot(至少一个task slot)。

每个Task Slot代表TaskManager资源的固定子集。例如,具有3个slot的TaskManager则每个slot表示占用当前TaskManager进程的1/3的内存。每个Job在启动的时候都有自己的TaskSlots-数目是固定的,这样通过slot划分就可以避免不同job的subtask之间竞争内存资源。

以下表示一个Job获取6个slot,但是仅仅只有5个线程,3个task .
Flink
默认来自同一个job的不同task(阶段)的subtask可以共享task slot。默认情况下Flink中job计算所需的slots的个数是由Task中最大并行度所决定。

Source,Map(不需要状态存储)

Flink默认带槽共享的

  • Flink集群所需的任务槽与作业中使用的最高并行度恰好一样多。
  • 更容易获得更好的资源利用率。如果没有插槽共享,则非密集型source / map()子任务将阻塞与资源密集型window子任务一样多的资源。通过slot共享可以讲任务并行度由2增加到6,可以得到如下资源分配。
    Flink

Flink基础环境

  • 前提条件
    • HDFS正常启动 (SSH免密码认证)
    • [aaa@qq.com ~]# start-dfs.sh
    • JDK1.8+
  • 上传并解压flink
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
  • 配置flink-conf.yaml配置文件

[aaa@qq.com ~]# vi /usr/flink-1.8.1/conf/flink-conf.yaml

jobmanager.rpc.address: CentOS
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
  • 配置slaves
[aaa@qq.com ~]# vi /usr/flink-1.8.1/conf/slaves
CentOS
  • 启动Flink
[aaa@qq.com ~]# cd /usr/flink-1.8.1/
[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host CentOS.
Starting taskexecutor daemon on host CentOS.
[aaa@qq.com flink-1.8.1]# jps
2912 Jps
2841 TaskManagerRunner
2397 StandaloneSessionClusterEntrypoint

访问:http://centos:8081
Flink
参考:<https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html

【Struts2接收日期类型美行的电脑会导致数据类型接收错误

自定义数据类型转换或者tomcat启动时设置语言参数】

快速入门

  • 引入依赖pom
<properties>
    <flink.version>1.8.1</flink.version>
    <scala.version>2.11</scala.version>
</properties>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • client程序
//1.创建流处理的环境 - 远程发布|本地执行
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.读取外围系统数据 - 细化
val lines:DataStream[String]=fsEnv.socketTextStream("CentOS",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
// print(fsEnv.getExecutionPlan)
//3.执行流计算
fsEnv.execute("wordcount")
[aaa@qq.com flink-1.8.1]# ./bin/flink run --class com.baizhi.helloword.FlinkWordCount --detached --parallelism 3 /root/original-flink-1.0-SNAPSHOT.jar
Starting execution of program
Job has been submitted with JobID 221d5fa916523f88741e2abf39453b81
[aaa@qq.com flink-1.8.1]#
[aaa@qq.com flink-1.8.1]# ./bin/flink list -m CentOS:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
14.10.2019 17:15:31 : 221d5fa916523f88741e2abf39453b81 : wordcount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

  • 取消任务
[aaa@qq.com flink-1.8.1]# ./bin/flink cancel -m CentOS:8081 221d5fa916523f88741e2abf39453b81
Cancelling job 221d5fa916523f88741e2abf39453b81.
Cancelled job 221d5fa916523f88741e2abf39453b81.

程序部署方式

  • 脚本
[aaa@qq.com flink-1.8.1]# ./bin/flink run --class com.baizhi.demo01.FlinkWordCounts --detached --parallelism 3 /root/original-flink-1.0-SNAPSHOT.jar
  • UI页面提交
    Flink
  • 跨平台
val jarFiles="flink\\target\\original-flink-1.0-SNAPSHOT.jar" //测试
val fsEnv = StreamExecutionEnvironment.createRemoteEnvironment("CentOS",8081,jarFiles)

别忘打jar包
  • 本地模拟
val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3)
或者
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment //自动识别运行环境,一般用于生产

DataStream API

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html

Data Sources

Source是程序读取其输入的位置。您可以使用fsEnv.addSource(sourceFunction)将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自己的自定义Source,或通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来实现并行Source.

Spark 只读一次,修改也不会再读
Flink 一段修改,会从头读

File-based

readTextFile(path) - 逐行读取文本文件,底层使用TextInputFormat规范读取文件,并将其作为字符串返回。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val lines:DataStream[String]=fsEnv.readTextFile("file:///E:\\demo\\words")

lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()

fsEnv.execute("wordcount")

readFile(fileInputFormat, path)- 根据指定的文件输入格式读取文件(仅仅读取一次,类似批处理)。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=fsEnv.readFile(inputFormat,"file:///E:\\demo\\words")

lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()

fsEnv.execute("wordcount")

readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个内部调用的方法。

它根据给定的FileInputFormat读取路径中的文件。

可以根据watchType定期的检测路径下的文件,其中watchType可选值FileProcessingMode.PROCESS_CONTINUOUSLY或者FileProcessingMode.PROCESS_ONCE检查的周期由interval参数指定。

用户可以使用pathFilter参数排除路劲下需要排除的文件。如果处理是PROCESS_CONTINUOUSLY,一旦文件内容发生改变,整个文件内容会被重复处理。

package com.baizhi.datasources

import org.apache.flink.api.common.io.FilePathFilter
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}

/*
根据指定的文件输入格式读取文件(仅仅读取一次,类似批处理)
 */
object FlieBasedReadFile2 {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val lines: DataStream[String] = fsEnv.readFile(new TextInputFormat(null),"file:///E:/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,5000,new FilePathFilter {
      override def filterPath(path: Path): Boolean = {
        path.getPath.endsWith(".txt")
      }
    })
    //对DS进行处理
    lines.flatMap(_.split("\\s+"))//柯理化 需要导入隐式
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .print()

    //print(fsEnv.getExecutionPlan)

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

重要笔记:

  1. 如果将watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. 如果将watchType设置为FileProcessingMode.PROCESS_ONCE,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

Socket-based

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val lines:DataStream[String]=fsEnv.socketTextStream("CentOS",9999)
lines.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(t=>t._1)
    .sum(1)
    .print()

fsEnv.execute("wordcount")

Collection-based(测试)

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=fsEnv.fromCollection(List("this is a demo","good good"))
                            // fsEnv.fromElements("this is a demo","good good")

lines.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(t=>t._1)
    .sum(1)
    .print()

fsEnv.execute("wordcount")

Custom Source

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

import scala.util.Random

class CustomSourceFunction extends ParallelSourceFunction[String]{
  @volatile
  var isRunning:Boolean = true
  val lines:Array[String] = Array("this is a demo","hello word","are you ok")
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while(isRunning){
      Thread.sleep(1000)
      ctx.collect(lines(new Random().nextInt(lines.length)))//将数据输出给下游
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=fsEnv.addSource[String](new CustomSourceFunction)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
fsEnv.execute("wordcount")

√FlinkKafkaConsumer

#启动hdfs
[aaa@qq.com ~]# start-dfs.sh
#启动Flink
[aaa@qq.com ~]# cd /usr/flink-1.8.1/
[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
#启动zookeeper
[aaa@qq.com flink-1.8.1]# cd /usr/zookeeper-3.4.6/
[aaa@qq.com zookeeper-3.4.6]# bin/zkServer.sh start conf/zoo.cfg
#启动kafka(如果zookeeper没起会闪退)
[aaa@qq.com ~]#  cd /usr/kafka_2.11-2.2.0/
[aaa@qq.com kafka_2.11-2.2.0]# bin/kafka-server-start.sh -daemon config/server.properties
[aaa@qq.com kafka_2.11-2.2.0]# jps
5600 QuorumPeerMain
2884 TaskManagerRunner
5893 Kafka
2439 StandaloneSessionClusterEntrypoint
1721 SecondaryNameNode
1531 DataNode
1438 NameNode
5967 Jps

[aaa@qq.com flink-1.8.1]# cd /usr/kafka_2.11-2.2.0/
#查看topic列表
[aaa@qq.com kafka_2.11-2.2.0]# bin/kafka-topics.sh --list  --bootstrap-server CentOS:9092

#创建topic
[aaa@qq.com kafka_2.11-2.2.0]#  bin/kafka-topics.sh --create --topic topic1 --partitions 1 --replication-factor 1 --bootstrap-server CentOS:9092

#发布消息
[aaa@qq.com kafka_2.11-2.2.0]# bin/kafka-console-producer.sh --topic topic1 --broker-list CentOS:9092
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
package com.baizhi.datasources

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object FlinkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val props = new Properties()
    props.setProperty("bootstrap.servers", "CentOS:9092")
    props.setProperty("group.id", "g1")
    val lines= fsEnv.addSource(new FlinkKafkaConsumer("topic1",new SimpleStringSchema(),props))
    //对DS进行处理
    lines.flatMap(_.split("\\s+"))//柯理化 需要导入隐式
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .print()

    //print(fsEnv.getExecutionPlan)

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

如果使用 SimpleStringSchema 仅仅是拿到value,如果用户希望拿到更多信息 比如 key/value/partition/owset 用户可以通过自定义 KafkaDeserializationSchema 的子类定制反序列化.

package com.baizhi.datasources

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.streaming.api.scala._//一定不要忘了引
class UserKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String)]{
  //这个方法永远返回false
  override def isEndOfStream(t: (String, String)): Boolean = {
    false
  }

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
    var key = ""
    if(consumerRecord.key()!=null&&consumerRecord.key().size!=0){
      key = new String(consumerRecord.key())
    }
    val value = new String(consumerRecord.value())
    (key,value)
  }

  //告诉Flink tuple元素类型
  override def getProducedType: TypeInformation[(String, String)] = {
    createTypeInformation[(String, String)]
  }
}

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

val lines:DataStream[(String,String)]=fsEnv.addSource(new FlinkKafkaConsumer("topic01",new UserKafkaDeserializationSchema(),props))

lines.map(t=>t._2).flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()

fsEnv.execute("wordcount")

如果Kafka存储的都是json字符串数据,用户可以使用系统自带一些json支持的Schema。推荐使用

  • JsonDeserializationSchema:要求value必须是json字符串 【不知道为啥没有这个方法】
  • JSONKeyValueDeserializationSchema(meta):要求key,value都必须是josn格式,同时可以携带元数据(分区、 offset等)
package com.baizhi.datasources

import java.util.Properties

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
object FlinkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val props = new Properties()
    props.setProperty("bootstrap.servers", "CentOS:9092")
    props.setProperty("group.id", "g1")
    //                                                                                                        包含元数据,用于显示此消息的偏移量/分区/主题
    val jsonData: DataStream[ObjectNode] = fsEnv.addSource(new FlinkKafkaConsumer("topic1",new JSONKeyValueDeserializationSchema(true),props))
    //对DS进行处理
    jsonData.map(on=>(on.get("value").get("id").asInt(),on.get("value").get("name")))
      .print()

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

Data Sinks

Data Sinks接收DataStream数据,并将其转发到文件,socket,外部系统或者print它们。Flink预定义一些输出Sink。

file-based

write*:writeAsText/writeAsCsv(…)/writeUsingOutputFormat请注意,DataStream上的write *()方法主要用于调试目的。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .writeAsText("file:///E:/results/text",WriteMode.OVERWRITE)

fsEnv.execute("wordcount")

以上的写法只能保证at_least_once语义的保证,如果是在生产环境下推荐使用flink-connector-filesystem将数据写到外围系统,可以保证exactly-once

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
//BucketingSink 1.9弃用   改使用StreamingFileSink
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://CentOS:9000/BucketingSink")
bucketingSink.setBucketer(new DateTimeBucketer("yyyyMMddHH"))//文件目录
bucketingSink.setBatchSize(1024)
fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .addSink(bucketingSink)
    .setParallelism(6)

fsEnv.execute("wordcount")
package com.baizhi.datasinks

import java.util.Properties

import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object FlieBasedWrite {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "CentOS:9092")
    props.setProperty("group.id", "g1")
    //BucketingSink 1.9弃用   改使用StreamingFileSink
    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("hdfs://CentOS:9000/StreamingFileSink"), new SimpleStringEncoder[String]("UTF-8"))
      .build()
    fsEnv.addSource(new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema(),props))
      .flatMap(_.split("\\s+"))
      .addSink(sink)
      .setParallelism(6)

    fsEnv.execute("wordcount")
  }
}

print()/ printToErr()

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("测试")//输出前缀,用于区分当有多个输出到控制台的流,可以添加 前缀
//.printToErr("测试")
.setParallelism(2)

fsEnv.execute("wordcount")

Custom Sink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class  CustomSinkFunction extends  RichSinkFunction[(String,Int)]{
  override def open(parameters: Configuration): Unit = {
    println("初始化连接")
  }
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println(value)
  }

  override def close(): Unit = {
    println("关闭连接")
  }
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .addSink(new CustomSinkFunction)

fsEnv.execute("wordcount")

√ RedisSink

 cd /usr/local/bin/
  
  #启动redis服务
  [aaa@qq.com ~]# cd /usr/local/bin/
  [aaa@qq.com bin]# redis-server redis.conf
  #启动redis客户端
  [aaa@qq.com bin]# redis-cli
  #关闭redis服务
  [aaa@qq.com bin]# redis-cli shutdown
  #查看是否成功
  127.0.0.1:6379> keys *
  
  #清空库
  127.0.0.1:6379> flushall
  • 添加
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
class UserRedisMapper extends RedisMapper[(String,Int)]{
  // 设置数据类型
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"wordcount")
  }

  override def getKeyFromData(data: (String, Int)): String = {
    data._1
  }

  override def getValueFromData(data: (String, Int)): String = {
    data._2.toString
  }
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

val jedisConfig=new FlinkJedisPoolConfig.Builder()
    .setHost("CentOS")
    .setPort(6379)
    .build()

fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .addSink(new RedisSink[(String, Int)](jedisConfig,new UserRedisMapper))

fsEnv.execute("wordcount")

127.0.0.1:6379> hgetall wordCount

√FlinkKafkaProducer

输出到Kafka中

[aaa@qq.com zookeeper-3.4.6]# cd /usr/kafka_2.11-2.2.0/
#查看topic列表
[aaa@qq.com kafka_2.11-2.2.0]# bin/kafka-topics.sh --list  --bootstrap-server CentOS:9092

#创建topic
[aaa@qq.com kafka_2.11-2.2.0]#  bin/kafka-topics.sh --create --topic topic1 --partitions 1 --replication-factor 1 --bootstrap-server CentOS:9092
[aaa@qq.com kafka_2.11-2.2.0]#  bin/kafka-topics.sh --create --topic topic2 --partitions 1 --replication-factor 1 --bootstrap-server CentOS:9092

#发布消息
[aaa@qq.com kafka_2.11-2.2.0]# bin/kafka-console-producer.sh --topic topic1 --broker-list CentOS:9092

#订阅消息
[aaa@qq.com kafka_2.11-2.2.0]# 
bin/kafka-console-consumer.sh --topic topic2 --bootstrap-server CentOSAA:9092,CentOSBB:9092,CentOSCC:9092 --property print.key=true --property print.value=true 
class UserKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
Int

  override def serializeKey(element: (String, Int)): Array[Byte] = {
    element._1.getBytes()
  }

  override def serializeValue(element: (String, Int)): Array[Byte] = {
    element._2.toString.getBytes()
  }

  //可以覆盖 默认topic ,如果返回null 则将数据写入到默认topic中
  override def getTargetTopic(element: (String, Int)): String = {
    null
  }
}

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val props1 = new Properties()
props1.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOS:9092")
props1.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

val props2 = new Properties()
props2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOS:9092")
props2.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")
props2.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")
props2.setProperty(ProducerConfig.ACKS_CONFIG,"all")
props2.setProperty(ProducerConfig.RETRIES_CONFIG,"2")


fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props1))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer[(String, Int)]("topic02",new UserKeyedSerializationSchema,props2))

fsEnv.execute("wordcount")

DataStream Transformations

Map

Takes one element and produces one element.

dataStream.map { x => x * 2 }

FlatMap

Takes one element and produces zero, one, or more elements.

dataStream.flatMap { str => str.split(" ") }

Filter

Evaluates a boolean function for each element and retains those for which the function returns true.

dataStream.filter { _ != 0 }

Union

Union of two or more data streams creating a new stream containing all the elements from all the streams.

dataStream.union(otherStream1, otherStream2, ...)

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

两个保留其类型的数据流,允许在两个流之间共享状态。

val stream1 = fsEnv.socketTextStream("CentOS",9999)
val stream2 = fsEnv.socketTextStream("CentOS",8888)

stream1.connect(stream2).flatMap(line=>line.split("\\s+"),line=>line.split("\\s+"))
    .map(Word(_,1))
    .keyBy("word")
    .sum("count")
    .print()

Split

Split the stream into two or more streams according to some criterion.

根据某种标准将流分成两个或多个流。

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)               

Select

Select one or more streams from a split stream.

package com.baizhi.transformations

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}

object Split {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val stream1: DataStream[String] = fsEnv.socketTextStream("CentOS",9999)
    //对DS进行处理
    val splitStream = stream1.map(Integer.parseInt(_)).split((line:Int) => {
      if (line % 2 == 0) {
        List("error") //分支名称
      } else {
        List("info") //分支名称
      }
    })
    splitStream.select("error").print("error")
    splitStream.select("info").print("info")

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

Side Out

带标签

package com.baizhi.transformations

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector

object SideOut {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val lines: DataStream[String] = fsEnv.socketTextStream("CentOS",9999)
    //对DS进行处理

    //设置边输出标签
    val outTag: OutputTag[String] = new OutputTag[String]("reeor")

    val results = lines.process(new ProcessFunction[String, String] {
      override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
        if (value.contains("error")) {
          ctx.output(outTag, value)
        } else {
          out.collect(value)
        }
      }
    })
    results.print("正常结果")
    //获取边输出
    results.getSideOutput(outTag).print("错误结果")

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

fsEnv.socketTextStream("CentOS",9999)
        .flatMap(_.split("\\s+"))
        .map((_,1))
        .keyBy(0)
        .reduce((t1,t2)=>(t1._1,t1._2+t2._2))
        .print()

Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

具有初始值的键控数据流上的 “rolling” fold。将当前元素与上次折叠的值合并并发出新值。

package com.baizhi.transformations

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector

object Fold {
  def main(args: Array[String]): Unit = {
    //1、创建流处理的环境 - 远程发布|本地执行
    val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3) //自动识别运行环境,一般用于生产
    //2、读取外围系统数据 - 细化
    val lines: DataStream[String] = fsEnv.socketTextStream("CentOS",9999)
    //对DS进行处理

    lines.flatMap(_.split("\\s+"))
        .map((_,1))
        .keyBy(0)
      //      初始值 (t1,t2)(初始值元组,计算元组)
        .fold(("",0))((t1,t2)=>(t2._1,t1._2+t2._2))
        .print()

    //3、执行流计算
    fsEnv.execute("WordCount")
  }
}

Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

在键控数据流上滚动聚合。min和minby的区别在于min返回最小值,而minby返回在该字段中具有最小值的元素(max和maxby相同)。

zhangsan 001 1000
wangw 001 1500
zhaol 001 800
fsEnv.socketTextStream("CentOS",9999)
    .map(_.split("\\s+"))
    .map(ts=>(ts(0),ts(1),ts(2).toDouble))
    .keyBy(1)
    .minBy(2)//输出含有最小值的记录
    .print()
1> (zhangsan,001,1000.0)
1> (zhangsan,001,1000.0)
1> (zhaol,001,800.0)
fsEnv.socketTextStream("CentOS",9999)
    .map(_.split("\\s+"))
    .map(ts=>(ts(0),ts(1),ts(2).toDouble))
    .keyBy(1)
    .min(2)
    .print()
1> (zhangsan,001,1000.0)
1> (zhangsan,001,1000.0)
1> (zhangsan,001,800.0)

State 和Fault Tolerance(重点)

有状态操作或者操作算子在处理Datastream的元素或者事件的时候需要存储计算状态,这就使得状态在整个Flink的精细化计算扮扮演着非常重要的地位:

  • 记录数据在某一个过去时间段到当前时间期间数据状态信息。
  • 在每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。
  • 在训练机器学习模型时,状态保持当前版本的模型参数。

Flink管理状态,以便使用checkpoint和savepoint实现状态容错。Flink的状态在计算规模发生变化的时候,Flink可以自动在并行实例间实现状态的重新分发。Flink底层使用 StateBackend策略存储计算状态,StateBackend决定了状态存储的方式和位置(后续章节介绍)。

Spark在流计算中设置检查点时代码重新修改不会生效,Flink在流计算中即使设置检查点,在状态的计算规模发生改变时,会重新进行分发

  • 存储状态信息(StateBackend)
    1、Flink默认测试环境基于JobManager内存存储状态

​ 2、所有的TaskManager会使用内存存储状态,会把状态存储到文件系统中(生产环境,无法应对超大集群规模)

​ 3、Rock:内存加磁盘,备份到远端的文件系统中(生产环境,应对超大规模集群)
​ Queryable State只在 Flink1.7.1版本(通过外部程序访问流的状态,同storm的drpc)

在Flink状态管理中将所有能操作的状态分为Keyed StateOperator State,其中Keyed State中状态是和key一一绑定的,并且只能在KeyedStream中使用。所有non-KeyedStream状态操作都叫做Operator State。底层Flink在做状态管理的时候是将Keyed State和<parallel-operator-instance, key>由于某一个key仅仅落入其中一个operator-instance中,因此可以简单的理解Keyed State是和<operator,key>进行绑定的。Flink底层会采用Key Group机制对Keyed State进行管理或者分类,所有的keyed-operator在做状态操作的时候可能需要和1~n个KeyGroup进行交互。

Flink在分发keyed-state状态的时候,并不是以key为单位,Key Group是最小分发单元

Operator State (也称为 non-keyed state), 每个operator state 和 一个parallel operator instance绑定。Keyed StateOperator State 以两种形式存在 managed(管理) 和 raw(原生).所有的Flink已知的操作符都支持managed state,但是Raw Sate仅仅是在用户自定义operator时候使用,并且不支持在并行度发生变化的时候状态重新分发。因此Flink虽然支持Raw Sate但是在绝大多数场景,一般使用的都是managed State。

有key叫做Keyed State【工作使用,也是Flink的重中之重】,OPerator State的一种,但是跟key绑定,一个key对应一个state(一组变量 )
剩下的都叫做 OPerator State,只和操作符绑定

Keyed-state

keyed-state接口提供对不同类型的状态的访问,所有状态都限于当前输入元素的key。

类型 说明 方法
ValueState 这个状态主要存储一个可以用作更新的值。 update(T)
T value()
clear()
ListState 存储List集合元素. add(T)
addAll(List)
Iterable get()
update(List)
clear()
ReducingState 这将保留一个值,该值表示添加到状态的所有值的汇总,
需要用户提供ReduceFunction
add(T)
T get()
clear()
AggregatingState<IN, OUT> 这将保留一个值,该值表示添加到状态的所有值的汇总,
需要用户提供AggregateFunction
add(IN)
T get()
clear()
FoldingState<T, ACC> 这将保留一个值,该值表示添加到状态的所有值的汇总,
需要用户提供FoldFunction
add(IN)
T get()
clear()
MapState<UK, UV> 这会保留一个Map。 put(UK, UV)
putAll(Map<UK, UV>)
entries()
keys()
values()
clear()

value state

package com.baizhi.keyedstate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object ValueState {
  def main(args: Array[String]): Unit = {
    var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.socketTextStream("CentOS",9999)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(0)
      //                      输入的泛型    输出的泛型
      .map(new RichMapFunction[(String,Int),(String,Int)] {
      //定义一个变量存储状态
        var vs:ValueState[Int] = _
      //开启状态存储
      override def open(parameters: Configuration): Unit = {
        val vsd = new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])
        vs = getRuntimeContext.getState[Int](vsd)
      }
        override def map(in: (String, Int)): (String, Int) = {
          val histroyCount = vs.value()
          val currentCount = histroyCount+in._2
          vs.update(currentCount)
          (in._1,currentCount)
        }
      })
      .print()

    fsEnv.execute("wordCount")
  }
}

AggregatingState<IN, OUT>

package com.baizhi.keyedstate

import org.apache.flink.api.common.functions.{AggregateFunction, RichMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object AggregatingState {
  def main(args: Array[String]): Unit = {
    var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
    // a 1
    // a 3
    fsEnv.socketTextStream("CentOS",9999)
      .map(_.split("\\s+"))
      .map(ts=>(ts(0),ts(1).toInt))
      .keyBy(0)
      //                      输入的泛型    输出的泛型
      .map(new RichMapFunction[(String,Int),(String,Double)] {
      //定义一个变量存储状态
        var vs:AggregatingState[Int,Double] = _
      //开启状态存储
      override def open(parameters: Configuration): Unit = {
                                              //输入  过程  输出
        val vsd = new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double]{
          override def createAccumulator(): (Double, Int) = {
            (0.0,0)//(累计的数值,个数)
          }

          override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {
            (accumulator._1+value,accumulator._2+1)
          }

          override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {
            (a._1+b._1,a._2+b._2)
          }

          override def getResult(accumulator: (Double, Int)): Double = {
            accumulator._1/accumulator._2
          }
        },createTypeInformation[(Double,Int)])
        vs = getRuntimeContext.getAggregatingState(vsd)
      }
        override def map(in: (String, Int)): (String, Double) = {
          vs.add(in._2)
          val avgCount: Double = vs.get()
          (in._1,avgCount)//返回(值,该值的平均数)
        }
      })
      .print()

    fsEnv.execute("wordCount")
  }
}

MapState<UK, UV>

使用场景:异地登录提醒

case class Login (id:String,name:String,ip:String,city:String,loginTime:String)
package com.baizhi.keyedstate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import scala.collection.JavaConverters._

object MapState {
  def main(args: Array[String]): Unit = {
    var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
    //001 zhansan 202.15.10.12 日本 2019-10-10
    fsEnv.socketTextStream("CentOS",9999)
      .map(_.split("\\s+"))
      .map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
      .keyBy("id","name")
      //                      输入的泛型    输出的泛型
      .map(new RichMapFunction[Login,String] {
      /*状态声明 定义一个变量存储状态*/
        var vs:MapState[String,String] = _
      //开启状态存储
      override def open(parameters: Configuration): Unit = {
          /*完成状态的初始化*/
        val msd = new MapStateDescriptor[String,String]("mapCount",createTypeInformation[String],createTypeInformation[String])
        vs = getRuntimeContext.getMapState(msd)
      }
          /*状态操作*/
        override def map(in: Login): String = {
          println("历史登录")
          for (k<- vs.keys().asScala){//导包转换成scala才能这样遍历
            println(k+" "+vs.get(k))//遍历输出历史状态
          }

          var result =""
          if(vs.keys().iterator().asScala.isEmpty){//如果key为空说明首次登录
            result = "ok"
          }else{
            if(!in.city.equalsIgnoreCase(vs.get("city"))){//跟之前的地址不一样  说明异地登录
              result="error"
            }else{//跟之前的地址一样 安全
              result="ok"
            }
          }

          vs.put("ip",in.ip)
          vs.put("city",in.city)
          vs.put("loginTime",in.loginTime)

          result
        }
      })
      .print()

    fsEnv.execute("wordCount")
  }
}

总结

new Rich[Map|FaltMap]Function {
    var vs:XxxState=_ //状态声明
    override def open(parameters: Configuration): Unit = {
        val xxd=new XxxStateDescription //完成状态的初始化
        vs=getRuntimeContext.getXxxState(xxd)
    }
    override def xxx(value: Xx): Xxx = {
       //状态操作
    }
}
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

State的应用场景:权限控制:用实时的流计算监控用户的状态,一旦权限不够进入惩罚系统,或者优惠卷推送

内存加磁盘的数据库,写入性能比Redis还要高,kafka的本地状态存储就是用的RockDB

tomcat的Session管理(keyGroup分配管理)
Flink在做状态恢复时,并行度扩展时,不影响状态维护(面试)
答:基于keyGroup进行分块存储,只要将对应的keyGroup给到(keyGroup不会变)。只有并行度超多最大并行度,才会重新分配keyGroup

OPerator每个实例都要一个状态,状态持久化的时候状态怎么存?
Operator状态是一个List集合(所有实例的元素合并成一个大的List集合)
【均分】
【Union,6个状态值分别给三个变量,分配策略】
分区号offset操作符【kafka的ConsumerSource和List(存储分区偏移量)】(每个实例读取对应分区的偏移量)

State Time-To-Live(TTL)

基本使用

可以将state存活时间(TTL)分配给任何类型的keyed-state。如果配置了TTL且状态值已过期,则flink将尽力清除存储的值。

//基本状态参数设置
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
  • 案例
package com.baizhi.statetimetolive

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.StateTtlConfig.{StateVisibility, UpdateType}
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object StateTTL {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.socketTextStream("CentOS",9999)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(0)
      .map(new RichMapFunction[(String,Int),(String,Int)] {
        var vs:ValueState[Int]=_
        override def open(parameters: Configuration): Unit = {
          val vsd = new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])
          val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //过期时间5s
            .setUpdateType(UpdateType.OnCreateAndWrite) //创建和修改的时候跟新过期时间
            .setStateVisibility(StateVisibility.NeverReturnExpired) //永远不返回过期的数据
            .build()
          vsd.enableTimeToLive(ttlConfig)
          vs = getRuntimeContext.getState[Int](vsd)
        }
        override def map(value: (String, Int)): (String, Int) = {
          val histroyCount = vs.value()
          val currentCount=histroyCount+value._2
          vs.update(currentCount)
          (value._1,currentCount)
        }
      }).print()

    fsEnv.execute("wordCount")
  }
}

注意:开启TTL之后,系统会额外消耗内存存储时间戳(Processing Time),如果用户以前没有开启TTL配置,在启动之前修改代码开启了TTL,在做状态恢复的时候系统启动不起来,跑出兼容性失败以及StateMigrationException异常。

清除Expired State

默认情况下,仅当明确读出过期值数据时候 例如,通过调用ValueState.value(),过期的数据才会被清除。这意味着默认情况下,如果未读取过期状态,则不会将其删除,可能会导致状态不断增长。

Cleanup in full snapshot

从上一次状态回复的时候,系统会加载所有的state 快照,在加载过程中会剔除那些过期的数据,并不会影响磁盘存储的状态数据。该状态数据只会在checkpoint的时候被覆盖。依然解决不了在运行时自动清除过期且没有用过的数据。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

只能用于 memory或者fs 状态后端实现,不支持RocksDB state backend.

Cleanup in background

可以开启后台清除策略,根据state backend的实现采取默认的清除策略(不同状态后端存储,清除策略不同)

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInBackground
    .build
  • Incremental cleanup(基于内存backend)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
              .setUpdateType(UpdateType.OnCreateAndWrite)
              .setStateVisibility(StateVisibility.NeverReturnExpired)
              .cleanupIncrementally(100,true) //默认值 5 | false
              .build()

第一个参数表示每一次触发cleanup的时候,系统一次处理100个元素。如果用户操作任意的一个state访问系统都会触发cleanup策略。第二参数如果true,表示系统会只要接收到记录数(即使用户没有操作状态)就会触发cleanup。

  • RocksDB compaction

RockDB(k-v存储)底层异步压缩状态,会将key相同的数据进行Compact(压缩),以减少state文件大小。但是并不 对过期state进行清理,因此可以通过配置CompactFilter让RockDB在compact的时候对过期的state进行排除。这种特性过滤的特性默认是关闭的,如果开启可以再flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled: true 或者通过API设置 RocksDBStateBackend::enableTtlCompactionFilter.
Flink

import org.apache.flink.api.common.state.StateTtlConfig 
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
              .setUpdateType(UpdateType.OnCreateAndWrite)
              .setStateVisibility(StateVisibility.NeverReturnExpired)
              .cleanupInRocksdbCompactFilter(1000)//默认配置1000
              .build()

这里的1000表示,系统在做compact的时候,系统会检查1000 元素是否失效。如果失效清除该过期数据。

Operator State

如果用户想使用Operator State,用户只需要实现通用的``CheckpointedFunction接口或者ListCheckpointed` 注意目前的operator-state仅仅支持list-style风格的状态,要求所存储到状态必须是一个List,且其中的元素必须可以序列化

CheckpointedFunction

提供两种不同的状态分发方案:Even-splitUnion

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint的时候,系统会调用snapshotState对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候调用initializeState()

Even-split:表示系统在故障恢复的时候,会将operator-state的元素均分给所有的operator实例,每个operator实例获取sub-list数据。

Union:表示系统在故障恢复的时候,每一个operator实例可以获取到整个operator-state全部数据。

案例

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {
    var listState:ListState[(String,Int)]=_
    val bufferedElements = ListBuffer[(String, Int)]()
    //负责将数据输出到外围系统
    override def invoke(value: (String, Int)): Unit = {
        bufferedElements += value
        if(bufferedElements.size == threshold){
            for(ele <- bufferedElements){
                println(ele)
            }
            bufferedElements.clear()
        }
    }
    //是在savepoint|checkpoint时候将数据持久化
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        listState.clear()
        for(ele <- bufferedElements){
            listState.add(ele)
        }
    }
    //状态恢复|初始化 创建状态
    override def initializeState(context: FunctionInitializationContext): Unit = {
        val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])
        listState=context.getOperatorStateStore.getListState(lsd)
        if(context.isRestored){
            for(element <- listState.get().asScala) {
                bufferedElements += element
            }
        }
    }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))

fsEnv.execute("testoperatorstate")
  • 启动netcat服务
[aaa@qq.com ~]# nc -lk 9999
  • 任务提交
    Flink

注意将并行度设置为1,这样方便测试

  • 在netcat中输入以下数据
[aaa@qq.com ~]# nc -lk 9999
a1 b1 c1 d1

  • 配置Hadoop的环境变量

    [aaa@qq.com ~]# vi /root/.bashrc
    HADOOP_HOME=/usr/hadoop-2.9.2
    JAVA_HOME=/usr/java/latest
    PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    CLASSPATH=.
    export JAVA_HOME
    export PATH
    export CLASSPATH
    export HADOOP_HOME
    HADOOP_CLASSPATH=`hadoop classpath`
    export HADOOP_CLASSPATH
    
    [aaa@qq.com ~]# source .bashrc
    
    #Flink最好重启一哈
    
  • 取消任务,并且创建savepoint

[aaa@qq.com flink-1.8.1]# ./bin/flink list -m CentOS:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[aaa@qq.com flink-1.8.1]# ./bin/flink cancel -m CentOS:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://CentOS:9000/savepoints/savepoint-f21795-38e7beefe07b.

注意,如果Flink需要和Hadoop整合,必须保证在当前环境变量下有HADOOP_HOME|HADOOP_CALSSPATH

  • 测试状态
    Flink

ListCheckpointed

该接口是CheckpointedFunction一种变体形式,仅仅只支持Even-split状态的分发策略。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
  • snapshotState:调用checkpoint的时候,系统会调用snapshotState对状态做快照.
  • restoreState:等价上述CheckpointedFunction中声明的initializeState方法,用作状态恢复。

案例

package com.baizhi.operatorstate

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import java.lang.{Long => JLong}
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import scala.{Long => SLong}

class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed [JLong]{
  @volatile
  var isRunning:Boolean = true
  var offset = 0L
  //调用CheckPoint的时候,系统会调用snapshotState对状态【做快照】
  override def snapshotState(checkpointId: SLong, timestamp: SLong): util.List[JLong] = {
    //存储的是,当前source的偏移量,如果状态不可拆分,用户可使用Collections.singletonList(offset)
    Collections.singletonList(offset)//单例
  }
  //等价CheckpointFunction中声明的initializeState方法,用作【状态恢复】
  override def restoreState(state: util.List[JLong]): Unit = {
    for(s<-state.asScala){
      offset = s  //将状态中的State赋值到offset
    }
  }

  override def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {
    val lock = ctx.getCheckpointLock  //检查点锁
    while (isRunning){
      Thread.sleep(1000)  //让读的慢一点  便于测试
      lock.synchronized({ //偏移量的读取和偏移量的移动是原子操作
        ctx.collect(offset)
        offset += 1
      })
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
package com.baizhi.operatorstate

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment,_}

object ListCheckpointed {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.addSource[Long](new CustomStatefulSourceFunction)
      .print("offset:")

    fsEnv.execute("testOffset")
  }
}
com.baizhi.operatorstate.ListCheckpointed
[aaa@qq.com ~]# cd /usr/flink-1.8.1/
[aaa@qq.com flink-1.8.1]# ./bin/flink list -m CentOS:8081
[aaa@qq.com flink-1.8.1]# ./bin/flink cancel -m CentOS:8081 -s hdfs:///savepoints1 41f3fcd7f0832c519659d4a7aac8dce3
#状态恢复提交	会从上次处重新读
offset:> 142
offset:> 143
offset:> 143
offset:> 144

广播状态

支持的operator state的第三种类型是广播状态。引入了广播状态以支持用例,其中需要将来自一个流的某些数据广播到所有下游任务,广播的状态将储在本地,用于处理另一个流上的所有传入元素。

A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.

√non-keyed

场景:实时计算用户的购买路径

#如果网络连不上,看一下端口是否被占用,如果占用需要换端口
[aaa@qq.com ~]# netstat -an | grep 9888
[aaa@qq.com ~]# netstat -an | grep 8765
/*三个基础样例类*/

//规则          类别          阈值
case class Rule(channel:String,threshold:Int)
//用户操作详情    id        name        类别          操作
case class UserAction (id:String,name:String,channel:String,action:String)
/*通过两个流的信息计算出的用户购买路径*/
//                      id        name        类别           购买路径
case class UserBuyPath (id:String,name:String,channel:String,path:Int)

将流1处理数据进行处理

package com.baizhi.broadcaststate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
//【数据流1的数据处理】将用户操作数据  映射成  用户购买路径数据
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{
  var buyPathState:MapState[String,Int] = _   //(channel,path)
  //初始化状态信息
  override def open(parameters: Configuration): Unit = {
    //                                  别名                序列化可以的类型              序列化value的类型
    val msd = new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])

    buyPathState = getRuntimeContext.getMapState(msd)
  }
  //映射
  override def map(value: UserAction): UserBuyPath = {
    val channel = value.channel
    val action = value.action
    var path = 0
    if(buyPathState.contains(channel)){//如果状态中存在channel从其中获取path
      /*【第一次操作,状态中肯定没有该channel,更不可能拿到path,
      使用初始化的path=0,
      下面的if语句如果写在这个if语句里,
      永远不会被执行】*/
      path = buyPathState.get(channel)
    }
    if(action.equals("buy")){//购买操作清空购买路径
      buyPathState.remove(channel)
    }else{//其他操作 将购买路径+1
      buyPathState.put(channel,path+1)
    }
    UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))
  }
}

对两个流进行处理计算

package com.baizhi.broadcaststate

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._

//用户购买路径                            传来的状态参数                                                                    流1  流2 输出
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{
  //处理的是UserBuyPath,读取广播状态
  override def processElement(value: UserBuyPath, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {
    val broadcastState = ctx.getBroadcastState(msd)//传来的状态参数
    if(broadcastState.contains(value.channel)){ //如果有规则流,尝试计算
      val threshold = broadcastState.get(value.channel)//根据channel拿到的阈值
      if(value.path >= threshold){//将购买路径大于等于阈值的用户信息输出
        println("++++++++++++++符合规则++++++++++++++++++")
        out.collect(value.id+" "+value.path+" "+value.channel+" "+value.path)
      }
    }
  }

  //处理的是规则 Rule 数据,记录修改广播状态
  override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context, out: Collector[String]): Unit = {
    val broadcastState = ctx.getBroadcastState(msd)
    broadcastState.put(value.channel,value.threshold)//更新状态
    println("===============rule==================")
    for (entry <- broadcastState.entries().asScala){
      println(entry.getKey+"\t"+entry.getValue)
      println()
      println()
    }
  }
}

driver

package com.baizhi.broadcaststate

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object NonKeyedBroadcastStateFunction {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    /*流1  用户操作信息*/
    // id   name    channel  action
    /*001 zhangsan 手机 view
      001 zhangsan 手机 view
      001 zhangsan 手机 view
      001 zhangsan 手机 view
      001 zhangsan 手机 view
      001 zhangsan 手机 view
      002 lisi 手机 view
      002 lisi 手机 view
      002 lisi 手机 view
      002 lisi 手机 view*/
    // 001 zhangsan 手机 addToCart
    // 001 zhangsan 手机 buy
    val userStream = fsEnv.socketTextStream("CentOS",9888)
      .map(line=>line.split("\\s+"))
      .map(ts=>(UserAction(ts(0),ts(1),ts(2),ts(3))))
      .keyBy("id","name")
      .map(new UserActionRichMapFunction)

    val msd = new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],createTypeInformation[Int])

    /*流2  规则*/
    //类别  阈值
    //手机 4
    val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("CentOS", 8765)
      .map(line => line.split("\\s+"))
      .map(ts => Rule(ts(0), ts(1).toInt))
      .broadcast(msd)

    userStream.connect(broadcastStream)
      .process(new UserBuyPathBroadcastProcessFunction(msd))
      .print()

    fsEnv.execute("testoperatorstate")

  }
}

**keyed **

带key输出,没什么场景

class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{
  override def processElement(value: UserAction,
                              ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    println("value:"+value +" key:"+ctx.getCurrentKey)
    println("=====state======")
    for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){
      println(entry.getKey+"\t"+entry.getValue)
    }
  }

  override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {
     println("Rule:"+value)
    //更新状态
    ctx.getBroadcastState(msd).put(value.channel,value.threshold)
  }
}

case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
// id   name    channel  action
// 001 zhangsan 手机      view
// 001 zhangsan 手机      view
// 001 zhangsan 手机      addToCart
// 001 zhangsan 手机 buy
val userKeyedStream = fsEnv.socketTextStream("CentOS", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以写一个参数


val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类 10
// 电子类 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("CentOS", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)

userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()


fsEnv.execute("testoperatorstate")

Checkpoint & SavePoints

Checkpoint 是Flink实现故障容错一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。

SavePoint是一种有效运维手段,需要用户手动触发程序进行状态备份。本质也是在做checkpoint。

实现故障恢复先决条件:

  • 持久(或持久)数据源,可以在一定时间内重播记录。(FlinkKafkaConsumer)
  • 状态的永久性存储,通常是分布式文件系统(例如,HDFS)
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
//启动检查点机制
fsEnv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止
fsEnv.getCheckpointConfig.setCheckpointTimeout(2000)
//设置checkpoint之间时间间隔 <=  Checkpoint interval
fsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默认1
fsEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦检查点不能正常运行,Task也将终止
fsEnv.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpoint
fsEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")

fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0)//只可以写一个参数
.sum(1)
.print()

fsEnv.execute("testoperatorstate")

State backend

state backend决定Flink如何存储系统状态信息(Checkpoint形式),目前Flink提供了三种state backend实现。

  • Memory (jobmanager):这是Flink默认实现,通常用于测试,系统会将计算状态存储在JobManagwer的内存中,但是在实际生产环境下,由于计算的状态比较大,使用Memory 很容易导致OOM(out of memory).
  • FileSystem:系统会将计算状态存储在TaskManager的内存中,因此一般用作生产环境,系统会更具checkpoin机制会将TaskManager状态数据在文件系统上进行备份。如果是操大集群规模,TaskManager内存也可能产生溢出。
  • RocksDB : 系统会将计算状态存储在TaskManager的内存中,如果TaskManager内存不够,系统可以使用RocksDB配置本地磁盘完成状态的管理,同时支持将本地的状态数据备份到远程文件系统,因此RocksDB backend 是推荐的选择。

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

一、每一个Job 都可以配置自己的状态存储后端实现

var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
fsEnv.setStateBackend(fsStateBackend)

二、如果用户不配置,系统则使用默认实现,默认实现可以通过flink-conf-yaml配置

[aaa@qq.com ~]# cd /usr/flink-1.8.1/
#修改配置文件之前要关闭flink
[aaa@qq.com flink-1.8.1]# ./bin/stop-cluster.sh
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#修改配置文件之后重启flink
[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
 state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
 state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#
 state.savepoints.dir: hdfs:///flink-savepoints
 
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
 state.backend.incremental: true

必须在环境变量中出现HDOOP_CLASSPATH

Flink计算发布之后,是否还能够修改计算算子?

首先在Spark中这是不允许的,因为Spark持久化代码片段,一旦修改代码,必须删除checkpoint。但是Flink仅仅存储的是各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作的算子上指定uid属性。

package com.baizhi.checkpointandsavepoint

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Checkpoint {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //启动检查点机制                          精准一次语义
    fsEnv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
    //配置CheckPoint必须在2s内完成完成一次CheckPoint,否则检查点终止  【没有下面那个好使】
    fsEnv.getCheckpointConfig.setCheckpointTimeout(2000)
    //设置CheckPoint之间的时间间隔  <= checkpont interval  【隔5s做一次CheckPoint】
    fsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
    //配置CheckPoint的并行度,不配置默认是1
    fsEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //一旦检查点不能正常运行,Task也将终止【一般设为true】参数设为  false  task不会终止
    fsEnv.getCheckpointConfig.setFailOnCheckpointingErrors(true)
    //将检查点存储外围系统 fileSystem、rocksdb,可以配置在cancel任务时候,系统是否保留CheckPoint【RETAIN_ON_CANCELLATION不删,一般使用,DELETE_ON_CANCELLATION删除】
    fsEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //配置kafka参数
    val props = new Properties()
    props.setProperty("bootstrap.servers", "CentOS:9092")
    props.setProperty("group.id", "g1")

    fsEnv.addSource(new FlinkKafkaConsumer[String]("topic3",new SimpleStringSchema(),props))
      .uid("kafka-consumer")
      .flatMap(line=>line.split("\\s+"))
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .uid("word-count")
      .map(t=>t._1+"->"+t._2)
      .print()

    fsEnv.execute("testcheckpoint")
  }
}

要上传到集群中测试,集群中的配置是更改好的(上传胖jar,不上传胖jarkafka依赖是没有的)

#从CheckPoint恢复上次的数据
com.baizhi.checkpointandsavepoint.Checkpoint
hdfs://CentOS:9000/flink-checkpoints/633a08dacef3ec0dc87d4da0e707bf03/chk-18

Flink Kafka 如何保证精准一次语义操作的?

Windows

Windows是流计算的核心。 Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算(ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction)等。在Flink中编写一个窗口计算的基本结构如下:

Keyed Windows

stream
    .keyBy(...)                
    .window(...)               <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()      <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据。

Non-Keyed Windows

stream
    .windowAll(...)            <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()      <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据。

Window Lifecycle

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners).

当有一个元素落入了窗口的时间范围该窗口将创建了,当watermarker没过了当前窗口的end time的时候该窗口会被自动删除。Flink保证窗口删除只包含一下几种:sliding、tumbling、session 窗口,不包含 global windows,因为global windows是基于元素的个数对窗口划分,并不是基于时间。

in addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied.

每个窗口都有一个Trigger和聚合函数,触发器主要负责触发窗口,聚合函数主要负责做计算。这里面除了 global windows没有触发器以外,其他的所有窗口都有默认触发器。除了以上以外窗口还可以指定Evictor用于在窗口触发以前或者触发以后剔除窗口中的元素。

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Keyed和Non-Keyed Windows区别

  • 输入流的类型是keyed还是non-keyed
  • keyed window,窗口计算更具keyed数据会出现多个窗口计算的并行实例,相同key的元素一定会发送给同一个窗口实例做计算。
  • non-keyed window,任意时刻只有一个窗口计算实例。

Window Assigners(窗口分配器)

The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyedstreams) or the windowAll() (for non-keyed streams) call.

A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time.

定义了元素是如何落入到窗口当中去的(窗口的类型)。Flink中已经定义好了一些常见的窗口分配器比如:tumbling windows, sliding windows, session windows 以及global windows,除了global windows之外的所有窗口都是基于时间窗口,这些窗口可以基于EventTime或者ProcessTime ,这些Time Window 有start time 和 end time标示窗口已的范围,该窗口是前闭合后开的。同时该窗口有一个maxTimestamp方法可以计算出该窗口允许的最大时间戳的元素。

Tumbling Windows

滚地窗口长度固定,滑动间隔等于窗口长度,窗口元素之间没有交叠。
Flink

package com.baizhi.window

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object KeyedTumblingWindow {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.socketTextStream("CentOS",7788)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
      .print()

    fsEnv.execute("TumbingWindow")
  }
}

Sliding Windows

窗口长度大于窗口滑动间隔,元素存在交叠。
Flink
这种写法可以拿到Window对象

package com.baizhi.window

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object KeyedSlidingWindow {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.socketTextStream("CentOS",7788)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(_._1)//不能用0  用0感受不到泛型
      .window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
      .process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow] {
        override def process(key: String, context: Context,
                             elements: Iterable[(String, Int)],
                             out: Collector[String]): Unit = {
          val sdf = new SimpleDateFormat("HH:mm:ss")
          val window = context.window//可以拿到窗口对象
          println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
          for (e <- elements){
            println(e+"\t")
          }
          println()
        }
      })

    fsEnv.execute("TumbingWindow")
  }
}

Session Windows(MergerWindow)

通过计算元素时间间隔,如果间隔小于session gap则会合并到一个窗口中。如果大于时间间隔,当前窗口关闭,后续

的元素属于新的窗口。与滚动和滑动不同的时候会话窗口没有固定的窗口大小,底层本质上做的是窗口合并。
Flink

package com.baizhi.window

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala.function.{WindowFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object KeyedSessionWindow {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.socketTextStream("CentOS",7788)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(_._1)//不能用0  用0感受不到泛型
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
      .apply(new WindowFunction[(String,Int),String,String,TimeWindow]{
        override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
          val sdf = new SimpleDateFormat("HH:mm:ss")
          println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
          for(e<-input){
            println(e+"\t")
          }
          println()
        }
      })

    fsEnv.execute("SessionWindow")
  }
}

Global Windows

会将所有相同key的元素放到一个全局的窗口中,默认该窗口永远都不会闭合(永远都不会触发),因为该窗口没有默认的窗口触发器Trigger,因此需要用户自定义Trigger。
Flink

package com.baizhi.window

import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow  //这个包不能忘导了
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.util.Collector


object KeyedGlobalWindows {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    fsEnv.socketTextStream("CentOS",7788)
      .flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(_._1)//不能用0  用0感受不到泛型
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of[GlobalWindow](3))//全局有三个才会触发
      .apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{
        override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
          println("=======window========")
          for(e<- input){
            print(e+"\t")
          }
          println()
        }
      })

    fsEnv.execute("SessionWindow")
  }
}

Window Functions

当系统认定窗口就绪之后会调用Window Functions对窗口实现聚合计算。常见的Window Functions有以下形式: ReduceFunction, AggregateFunction, FoldFunction 或者ProcessWindowFunction|WindowFunction(古董|老旧) .

ReduceFunction

class SumReduceFunction extends ReduceFunction[(String,Int)]{
  override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
    (v1._1,v1._2+v2._2)
  }
}
package com.baizhi.windowfunctions.sum

import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object ReduceRrocessFunction {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val countStream = fsEnv.socketTextStream("CentOS", 7788)
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(4)))
      .reduce(new SumReduceFunction)
      .print()
      .setParallelism(1)


    fsEnv.execute("Window")

  }
}

AggregateFunction

class SumAggregateFunction extends AggregateFunction[(String,Int),(String,Int),(String,Int)]{
  override def createAccumulator(): (String,Int) = {
    ("",0)
  }

  override def add(value: (String, Int), accumulator: (String,Int)): (String,Int) = {
    (value._1,accumulator._2+value._2)
  }
  override def merge(a: (String,Int), b: (String,Int)): (String,Int) = {
    (a._1,a._2+b._2)
  }
  override def getResult(accumulator: (String,Int)): (String, Int) = {
    accumulator
  }
}

var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",9999)
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(new SumAggregateFunction)
    .print()

fsEnv.execute("window")

FoldFunction

class SumFoldFunction  extends  FoldFunction[(String,Int),(String,Long)]{
  override def fold(accumulator: (String, Long), value: (String, Int)): (String, Long) = {
    (value._1,accumulator._2+value._2)
  }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",8877)
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    //.fold(("",0L),new SumFoldFunction)
    .fold(("",0L))((acc,v)=>(v._1,acc._2+v._2))
    .print()

fsEnv.execute("window")

ProcessWindowFunction

var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("CentOS",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String,Int)]): Unit = {
        val results = elements.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
        out.collect(results)
    }
}).print()
fsEnv.execute("window")

globalState()|windowState()

  • globalState(), which allows access to keyed state that is not scoped to a window
  • windowState(), which allows access to keyed state that is also scoped to the window
  • 窗口统计是一个窗口的数据,下个窗口就不会计算上个窗口的数据
  • 全局统计是全局的数据,数据会一直叠加
package com.baizhi.globalandwindowstate

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object StateProcessFunction {
  def main(args: Array[String]): Unit = {
    var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

    val globalTag = new OutputTag[(String,Int)]("globalTag")

    val countsStream = fsEnv.socketTextStream("CentOS", 7788)
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
      .process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
        var wvds:ValueStateDescriptor[Int] = _
        var gvds:ValueStateDescriptor[Int] = _

        override def open(parameters: Configuration): Unit = {
          wvds = new ValueStateDescriptor[Int]("window-value",createTypeInformation[Int])
          gvds = new ValueStateDescriptor[Int]("global-value",createTypeInformation[Int])
        }
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
          val total = elements.map(_._2).sum

          val ws = context.windowState.getState(wvds)
          val gs = context.globalState.getState(gvds)

          val historyWindowValue = ws.value()
          val historyGlobalValue = gs.value()
          out.collect(key,historyWindowValue + total)
          context.output(globalTag,(key,historyGlobalValue + total))

          ws.update(historyWindowValue + total)
          gs.update(historyGlobalValue + total)
        }
      })
    countsStream.print("窗口统计")
    countsStream.getSideOutput(globalTag).print("全局输出")

    fsEnv.execute("window")
  }
}

ReduceFunction+ProcessWindowFunction

package com.baizhi.combine

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object ReduceRrocessFunction {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val countsStream = fsEnv.socketTextStream("CentOS", 7788)
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(4)))
      .reduce(new SumReduceFunction,new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
          val total = elements.map(_._2).sum
          out.collect(key,total)
        }
      })
      .setParallelism(1)
    countsStream.print("窗口统计")
    fsEnv.execute("Window")
  }
}

FoldFunction+ProcessWindowFunction

package com.baizhi.combine.fold

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object FoldRrocessFunction {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val countsStream = fsEnv.socketTextStream("CentOS", 7788)
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(4)))
      .fold(("",0L),new SumFoldFunction,new ProcessWindowFunction[(String,Long),(String,Long),String,TimeWindow]{
        override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
          val total = elements.map(_._2).sum
          out.collect(key,total)
        }
      })
      .setParallelism(1)
        .print()

    fsEnv.execute("Window")
  }
}

WindowFunction(遗产|古董)

不咋用,一般用ProcessWindowFunction替代。

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

fsEnv.socketTextStream("CentOS",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1) //不能按照pos进行keyby
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new WindowFunction[(String,Int),(String,Int),String,TimeWindow] {
    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        out.collect((key,input.map(_._2).sum))
    }
}).print()

fsEnv.execute("window")

Triggers(触发器)

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

WindowAssigners 触发器
GlobalWindow NeverTrigger
event-time window EventTimeTrigger
processing-time window ProcessingTimeTrigger

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

DeltaTrigger

出现某个规则窗口触发

package com.baizhi.triggers

import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

object MyDeltaTrigger {
  def main(args: Array[String]): Unit = {
    var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment
    val deltaTrigger = DeltaTrigger.of[(String, Double), GlobalWindow](2.0, new DeltaFunction[(String, Double)] {
      override def getDelta(oldDataPoint: (String, Double), newDataPoint: (String, Double)): Double = {
        newDataPoint._2 - oldDataPoint._2
      }
    }, createTypeInformation[(String, Double)].createSerializer(fsEnv.getConfig))


    fsEnv.socketTextStream("CentOS",7788)
      .map(_.split("\\s+"))
      .map(ts=>(ts(0),ts(1).toDouble))
      .keyBy(0)
      .window(GlobalWindows.create())
      .trigger(deltaTrigger)
      .reduce((v1:(String,Double),v2:(String,Double))=>(v1._1,v1._2+v2._2))
      .print()
    fsEnv.execute("window")
  }

}

evictor(剔出)

The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:

public interface Evictor<T, W extends Window> extends Serializable {
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

}

ErrorEvitor

class ErrorEvictor(isBefore:Boolean) extends Evictor[String,TimeWindow] {
    override def evictBefore(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }

    override def evictAfter(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(!isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }
    private def evictor(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit={
        val iterator = elements.iterator()
        while(iterator.hasNext){
            val it = iterator.next()
            if(it.getValue.contains("error")){//将 含有error数据剔出
                iterator.remove()
            }
        }
    }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",7788)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//windowAll 非key
.evictor(new ErrorEvictor(true))
.apply(new AllWindowFunction[String,String,TimeWindow] {
    override def apply(window: TimeWindow, input: Iterable[String], out: Collector[String]): Unit = {
        for(e <- input){
            out.collect(e)
        }
        print()
    }
})
.print()

fsEnv.execute("window")

Event Time

Flink在做窗口计算的时候支持以下语义的window:Processing timeEvent timeIngestion time

Processing time:使用处理节点时间,计算窗口

Event time:使用事件产生时间,计算窗口- 精确

Ingestion time:数据进入到Flink的时间,一般是通过SourceFunction指定时间
Flink
默认Flink使用的是ProcessingTime ,因此一般情况下如果用户需要使用 Event time/Ingestion time需要设置时间属性

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//window  操作
fsEnv.execute("event time")

一旦设置基于EventTime处理,用户必须声明水位线的计算策略,系统需要给每一个流计算出水位线时间T,只有窗口的end time T’ < = watermarker(T)的时候,窗口才会被触发。在Flink当中需要用户实现水位线计算的方式,系统并不提供实现。触发水位线的计算方式有两种:①一种是基于定时Interval(推荐)、②通过记录触发,每来一条记录系统会立即更新水位线。

case class AccessLog(channel:String, timestamp:Long)

定时

class AccessLogAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[AccessLog]{
  private var maxSeeTime:Long = 0L//目前最大时间
  private var mixOrderness:Long = 2000L//乱序时间
  override def getCurrentWatermark: Watermark = {
    return  new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = {
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

基于记录

class AccessLogAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def checkAndGetNextWatermark(lastElement: AccessLog, extractedTimestamp: Long): Watermark = {
    new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = { 
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

Watermarker

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()
[aaa@qq.com ~]# nc -lk 8888
1 1571627570000
2 1571627571000
2 1571627573000
2 1571627574000

window:11:12:48	11:12:52 	 watermarker:11:12:52
1	11:12:50
window:11:12:48	11:12:52 	 watermarker:11:12:52
2	11:12:51

48-52 水位线52+2=54	54触发窗口

迟到数据处理

Flink支持对迟到数据处理,如果watermaker - window end < allow late time 记录可以参与窗口计算,否则Flink将too late数据丢弃。

Spark中的迟到在Flink中代表乱序

toolate不计算但是可以处理,例如边输出

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))//乱序时间2s
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

fsEnv.execute("event time")
[aaa@qq.com ~]# nc -lk 8888
1 1571627570000
2 1571627571000
3 1571627574000
1 1571627570000
2 1571627571000

window:11:12:48	11:12:52 	 watermarker:11:12:52
1	11:12:50
window:11:12:48	11:12:52 	 watermarker:11:12:52
2	11:12:51
window:11:12:48	11:12:52 	 watermarker:11:12:52
1	11:12:50
1	11:12:50
window:11:12:48	11:12:52 	 watermarker:11:12:52
2	11:12:51
48-52 水位线52+2=54	54触发窗口

Flink默认对too late数据采取的是丢弃,如果用户想拿到过期的数据,可以使用sideout方式

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)

val lateTag = new OutputTag[AccessLog]("latetag")
//模块信息 时间
val keyedWindowStream=fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateTag)
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})

keyedWindowStream.print("正常:")
keyedWindowStream.getSideOutput(lateTag).print("too late:")

fsEnv.execute("event time")

1 1571627570000
2 1571627571000
3 1571627574000
3 1571627575000
1 1571627570000
1 1571627576000
1 1571627570000
window:11:12:48	11:12:52 	 watermarker:11:12:52
正常:> 1	11:12:50
window:11:12:48	11:12:52 	 watermarker:11:12:52
正常:> 2	11:12:51
window:11:12:48	11:12:52 	 watermarker:11:12:53
正常:> 1	11:12:50
正常:> 1	11:12:50
too late:> AccessLog(1,1571627570000)
48-52 水位线52+2=54	54触发窗口	54+2=56toolate

当流中存在多个水位线,系统在计算的时候取最低。

Joining

Window Join

基本语法

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Tumbling Window Join

Flink

case class User (id:String,name:String,ts:Long)
package com.baizhi.join

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
/*自定义水位线计算策略*/
class UserAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[User]{
  private var maxSeeTime:Long = 0L//目前最大时间
  private var mixOrderness:Long = 2000L//乱序时间

  override def getCurrentWatermark: Watermark = {
    return new Watermark(maxSeeTime - mixOrderness)
  }

  override def extractTimestamp(element: User, previousElementTimestamp: Long): Long = {
    maxSeeTime = Math.max(maxSeeTime,element.ts)
    element.ts
  }
}
case class OrderItem(uid:String,name:String,price:Double,ts:Long)
package com.baizhi.join

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

/*自定义水位线计算策略*/
class OrderItemWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[OrderItem]{
  private var maxSeeTime:Long = 0L//目前最大时间
  private var mixOrderness:Long = 2000L//乱序时间

  override def getCurrentWatermark: Watermark = {
    return new Watermark(maxSeeTime - mixOrderness)
  }

  override def extractTimestamp(element: OrderItem, previousElementTimestamp: Long): Long = {
    maxSeeTime = Math.max(maxSeeTime,element.ts)
    element.ts
  }
}

package com.baizhi.join

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingWindowJoin {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    fsEnv.getConfig.setAutoWatermarkInterval(1000)
    fsEnv.setParallelism(1)
    //001 zhangsan 1571627570000
    val userStream = fsEnv.socketTextStream("CentOS",7788)
      .map(line=>line.split("\\s+"))
      .map(ts=>User(ts(0),ts(1),ts(2).toLong))
      .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
      .setParallelism(1)

    //001 apple 4.5 1571627570000L
    val orderStream = fsEnv.socketTextStream("CentOS", 8899)
      .map(line => line.split("\\s+"))
      .map(ts => OrderItem(ts(0), ts(1), ts(2).toDouble, ts(3).toLong))
      .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
      .setParallelism(1)


    userStream.join(orderStream)
      .where(user=>user.id)
      .equalTo(orderItem => orderItem.uid)
      .window(TumblingEventTimeWindows.of(Time.seconds(4)))
      .apply((u,o)=>{
        (u.id,u.name,o.name,o.price,o.ts)
      })
      .print()

    fsEnv.execute("FlinkStreamTumblingWindowJoin")
  }
}
[aaa@qq.com ~]# nc -lk 7788
001 zhangsan 1571627570000
002 lisi 1571627574000

[aaa@qq.com ~]# nc -lk 8899
001 apple 4.5 1571627570000
001 phone 3 1571627574000

(001,zhangsan,apple,4.5,1571627570000)

Sliding Window Join

Flink

package com.baizhi.join

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object SlidingWindowJoin {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    fsEnv.getConfig.setAutoWatermarkInterval(1000)
    fsEnv.setParallelism(1)
    //001 zhangsan 1571627570000
    val userStream = fsEnv.socketTextStream("CentOS",7788)
      .map(line=>line.split("\\s+"))
      .map(ts=>User(ts(0),ts(1),ts(2).toLong))
      .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
      .setParallelism(1)

    //001 apple 4.5 1571627570000L
    val orderStream = fsEnv.socketTextStream("CentOS", 8899)
      .map(line => line.split("\\s+"))
      .map(ts => OrderItem(ts(0), ts(1), ts(2).toDouble, ts(3).toLong))
      .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
      .setParallelism(1)


    userStream.join(orderStream)
      .where(user=>user.id)
      .equalTo(orderItem => orderItem.uid)
      .window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
      .apply((u,o)=>{
        (u.id,u.name,o.name,o.price,o.ts)
      })
      .print()

    fsEnv.execute("FlinkStreamTumblingWindowJoin")
  }
}

Session Window Join

Flink

package com.baizhi.join

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object SessionWindowJoin {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    fsEnv.getConfig.setAutoWatermarkInterval(1000)
    fsEnv.setParallelism(1)
    //001 zhangsan 1571627570000
    val userStream = fsEnv.socketTextStream("CentOS",7788)
      .map(line=>line.split("\\s+"))
      .map(ts=>User(ts(0),ts(1),ts(2).toLong))
      .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
      .setParallelism(1)

    //001 apple 4.5 1571627570000L
    val orderStream = fsEnv.socketTextStream("CentOS", 8899)
      .map(line => line.split("\\s+"))
      .map(ts => OrderItem(ts(0), ts(1), ts(2).toDouble, ts(3).toLong))
      .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
      .setParallelism(1)


    userStream.join(orderStream)
      .where(user=>user.id)
      .equalTo(orderItem => orderItem.uid)
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .apply((u,o)=>{
        (u.id,u.name,o.name,o.price,o.ts)
      })
      .print()

    fsEnv.execute("FlinkStreamTumblingWindowJoin")
  }
}

Interval Join

基于定时Interval(推荐)类似于Strom,在高并发的时候是很有必要的

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.
Flink
This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

package com.baizhi.join

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object IntervalJoin {
  def main(args: Array[String]): Unit = {
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    fsEnv.getConfig.setAutoWatermarkInterval(1000)
    fsEnv.setParallelism(1)
    //001 zhangsan 1571627570000
    val userStream = fsEnv.socketTextStream("CentOS",7788)
      .map(line=>line.split("\\s+"))
      .map(ts=>User(ts(0),ts(1),ts(2).toLong))
      .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
      .setParallelism(1)
      .keyBy(_.id)//需要keyedStream

    //001 apple 4.5 1571627570000L
    val orderStream = fsEnv.socketTextStream("CentOS", 8899)
      .map(line => line.split("\\s+"))
      .map(ts => OrderItem(ts(0), ts(1), ts(2).toDouble, ts(3).toLong))
      .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
      .setParallelism(1)
      .keyBy(_.uid)//需要keyedStream

    userStream.intervalJoin(orderStream)
        .between(Time.seconds(-1),Time.seconds(1))
        .process(new ProcessJoinFunction[User,OrderItem,String] {
          override def processElement(left: User, right: OrderItem, ctx: ProcessJoinFunction[User, OrderItem, String]#Context, out: Collector[String]): Unit = {
            print(left+"\t"+right)
            out.collect(left.id+" "+left.name+" "+right.name+" "+ right.price+" "+right.ts)
          }
        })
        .print()

    fsEnv.execute("FlinkStreamTumblingWindowJoin")
  }
}

Flink HA

The JobManager coordinates every Flink deployment. It is responsible for both scheduling and resource management.

By default, there is a single JobManager instance per Flink cluster. This creates a single point of failure (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the SPOF. You can configure high availability for both standalone and YARN clusters.

Standalone Cluster High Availability

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failureand programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.
Flink

搭建过程

先决条件(略)

  • 安装JDK
  • 安装HADOOP HDFS-HA
  • 安装Zookeeper

Flink环境构建

  • 配置HADOOP_CLASSPATH
[aaa@qq.com ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
[aaa@qq.com ~]# source .bashrc
[aaa@qq.com ~]# echo $HADOOP_CLASSPATH
/usr/hadoop-2.9.2/etc/hadoop:/usr/hadoop-2.9.2/share/hadoop/common/lib/*:/usr/hadoop-2.9.2/share/hadoop/common/*:/usr/hadoop-2.9.2/share/hadoop/hdfs:/usr/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/usr/hadoop-2.9.2/share/hadoop/hdfs/*:/usr/hadoop-2.9.2/share/hadoop/yarn/lib/*:/usr/hadoop-2.9.2/share/hadoop/yarn/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/*:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar
  • 上传Flink,配置Flink
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[aaa@qq.com ~]# cd /usr/flink-1.8.1
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true
[aaa@qq.com flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[aaa@qq.com flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

启动Flink集群

[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群启动完成后,查看JobManager任务的日志,在lead主机中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

测试HA

登陆获取leadership的节点,然后执行以下指令

[aaa@qq.com flink-1.8.1]# ./bin/jobmanager.sh stop
[aaa@qq.com flink-1.8.1]#  ./bin/jobmanager.sh start

查看其它节点,按照上诉的测试方式,可以查找leadership日志输出的节点,该节点就是master节点。

Flink水位线(对比Spark中Structure Streaming),状态

状态(keyedState,brocastState,ttl的声明周期的控制,没开ttl恢复不可用)

Flink改代码要指定id

Hadoop -> Spark

SparkDtream -> StructrueStreaming -> Flink

件(略)*

  • 安装JDK
  • 安装HADOOP HDFS-HA
  • 安装Zookeeper

Flink环境构建

  • 配置HADOOP_CLASSPATH
[aaa@qq.com ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
[aaa@qq.com ~]# source .bashrc
[aaa@qq.com ~]# echo $HADOOP_CLASSPATH
/usr/hadoop-2.9.2/etc/hadoop:/usr/hadoop-2.9.2/share/hadoop/common/lib/*:/usr/hadoop-2.9.2/share/hadoop/common/*:/usr/hadoop-2.9.2/share/hadoop/hdfs:/usr/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/usr/hadoop-2.9.2/share/hadoop/hdfs/*:/usr/hadoop-2.9.2/share/hadoop/yarn/lib/*:/usr/hadoop-2.9.2/share/hadoop/yarn/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/*:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar
  • 上传Flink,配置Flink
[aaa@qq.com ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[aaa@qq.com ~]# cd /usr/flink-1.8.1
[aaa@qq.com flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true
[aaa@qq.com flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[aaa@qq.com flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

启动Flink集群

[aaa@qq.com flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群启动完成后,查看JobManager任务的日志,在lead主机中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

测试HA

登陆获取leadership的节点,然后执行以下指令

[aaa@qq.com flink-1.8.1]# ./bin/jobmanager.sh stop
[aaa@qq.com flink-1.8.1]#  ./bin/jobmanager.sh start

查看其它节点,按照上诉的测试方式,可以查找leadership日志输出的节点,该节点就是master节点。

Flink水位线(对比Spark中Structure Streaming),状态

状态(keyedState,brocastState,ttl的声明周期的控制,没开ttl恢复不可用)

Flink改代码要指定id

Hadoop -> Spark

SparkDtream -> StructrueStreaming -> Flink