欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

程序员文章站 2024-02-21 21:05:41
...

1、Flink 简介

Flink 起源于 Stratosphere 项目,Stratosphere 是在 2010~2014 年由 3 所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014 年 4 月 Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,参加这个孵化项目的初始成员是Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的*项目。
在德语中,Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为 logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而 Flink 的松鼠 logo 拥有可爱的尾巴,尾巴的颜色与 Apache 软件基金会的 logo 颜色相呼应,也就是说,这是一只 Apache 风格的松鼠。
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
Flink 项目的理念是:“Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink 是一个框架和分布式处理引擎,用于对*和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

Flink 的全球热度

【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

Flink 目前在国内企业的应用

【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

流数据处理为什么选择 Flink
  • 流数据更真实地反映了我们的生活方式
  • 传统的数据架构是基于有限数据集的
  • 我们的目标是:低延迟、高吞吐、结果的准确性、良好的容错性
哪些行业需要处理流数据
  • 电商和市场营销行业:数据报表、广告投放、业务流程需要
  • 物联网(IOT)行业:传感器实时数据采集和显示、实时报警
  • 交通运输业
  • 电信业:基站流量调配
  • 银行和金融业:实时结算和通知推送、实时检测异常行为

2、流处理架构演变

传统数据处理架构

事务处理:
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
分析处理:
将数据从业务数据库复制到数仓,再进行分析和查询
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

有状态的流式处理

【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

Lambda架构

用两套系统,同时保证低延迟和结果准确
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

3、Flink 的主要特点

事件驱动型(Event-driven)

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以 kafka 为代表的消息队列几乎都是事件驱动型应用。
与之不同的就是 SparkStreaming 微批次,如图:
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
事件驱动型:
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署

流与批的世界观

批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
理 流处理的特点是*、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
在 spark 的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在 flink 的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和*流。
*数据流:*数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理*流,也就是说必须在获取后立即处理event。对于*数据流我们无法等待所有数据都到达,因为输入是*的,并且在任何时间点都不会完成。处理*数据通常要求以特定顺序(例如事件发生的顺序)获取 event,以便能够推断结果完整性。
有界数据流:有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
这种以流为世界观的架构,获得的最大好处就是具有极低的延迟。

分层API

最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到 DataStream API 中。底层过程函数(Process Function) 与 DataStream API相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以*地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
实际上,大多数应用并不需要上述的底层抽象,而是针对核心 API(Core APIs)进行编程,比如 DataStream API(有界或*流数据)以及 DataSet API(有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API 遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、project、join、group-by、aggregate 等。Table API 程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。
尽管 Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心 API 更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API 程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与DataStream 以及 DataSet 混合使用。
Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与
Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。
目前 Flink 作为批处理还不是主流,不如 Spark 成熟,所以 DataSet 使用的并不是很多。Flink Table API 和 Flink SQL 也并不完善,大多都由各大厂商自己定制。所以我们主要学习 DataStream API 的使用。实际上 Flink 作为最接近 Google DataFlow模型的实现,是流批统一的观点,所以基本上使用 DataStream 就可以了。
Flink 几大模块:

  • Flink Table & SQL
  • Flink Gelly(图计算)
  • Flink CEP(复杂事件处理)
Flink 的其它特点
  • 支持事件时间(event-time)和处理时间(processing-time)语义
  • 精确一次(exactly-once)的状态一致性保证
  • 低延迟,每秒处理数百万个事件,毫秒级延迟
  • 与众多常用存储系统的连接
  • 高可用,动态扩展,实现7*24小时全天候运行
Flink vs Spark Streaming
  • 流(stream)和微批(micro-batching)
    【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
  • spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
  • flink基本数据模型是数据流,以及事件(Event)序列
  • spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
  • flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

4、Flink手写WordCount实例

搭建 Maven 工程 FlinkWordCount,pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
				xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
				xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
				http://maven.apache.org/xsd/maven-4.0.0.xsd">

	<modelVersion>4.0.0</modelVersion>
	<groupId>com.imu.flink</groupId>
	<artifactId>FlinkWordCount</artifactId>
	<version>1.0-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.11</artifactId>
			<version>1.7.2</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>1.7.2</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<!--  该插件用于将 Scala 代码编译成 class 文件 -->
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.4.6</version>
				<executions>
					<execution>
						<!--  声明绑定到 maven 的 compile 阶段 -->
						<goals>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.0.0</version>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

批处理 WordCount:

object WordCount {

	def main(args: Array[String]): Unit = {
		//  创建执行环境
		val env = ExecutionEnvironment.getExecutionEnvironment
		//  从文件中读取数据
		val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"
		val inputDS: DataSet[String] = env.readTextFile(inputPath)
		//  分词之后,对单词进行 groupby 分组,然后用 sum 进行聚合
		val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
				.flatMap(_.split(" "))
				.map((_, 1))
				.groupBy(0)
				.sum(1)
		//  打印输出
		wordCountDS.print()
	}
}

流处理 WordCount:

object StreamWordCount {

	def main(args: Array[String]): Unit = {
		//  从外部命令中获取参数
		val params: ParameterTool = ParameterTool.fromArgs(args)
		val host: String = params.get("host")
		val port: Int = params.getInt("port")
		//  创建流处理环境
		val env = StreamExecutionEnvironment.getExecutionEnvironment
		//  接收socket文本流
		val textDstream: DataStream[String] = env.socketTextStream(host, port)
		// flatMap 和 Map 需要引用的隐式转换

		import org.apache.flink.api.scala._
		val dataStream: DataStream[(String, Int)] = textDstream
				.flatMap(_.split("\\s"))
				.filter(_.nonEmpty)
				.map((_, 1))
				.keyBy(0)
				.sum(1)
		dataStream.print().setParallelism(1)
		//  启动 executor,执行任务
		env.execute("Socket stream word count")
	}
}

在 linux 系统中用 netcat 命令进行发送测试:nc -lk 7777

5、Flink 部署

Standalone模式
  1. 解压缩 flink-1.7.2-bin-hadoop27-scala_2.11.tgz,进入 conf 目录中。
  2. 修改 flink/conf/flink-conf.yaml 文件:
jobmanager.rpc.address:bigdata-senior.ibeifeng.com
  1. 修改 conf/slave 文件:
bigdata-senior02.ibeifeng.com
bigdata-senior03.ibeifeng.com
  1. 分发给另外两台机子:
conf]$ xsync flink-1.7.0
  1. 启动:
bin]$ ./start-cluster.sh
  1. 访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。
    【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
  2. 准备数据文件
how are you
fine thank you
and you
i am fine too thank you
  1. 把包含数据文件的文件夹,分发到 taskmanager 机器中,由于读取数据是从本地磁盘读取,实际任务会被分发到 taskmanager 的机器中,所以要把目标文件分发。
applog]$ xsync flink
  1. 执行程序
bin]$ ./flink run -c com.imu.flink.app.BatchWcApp /ext/flinkTest-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output.csv
  1. 到目标文件夹中查看计算结果,计算结果会保存到 taskmanager 的机器下,不会在 jobmanager 下。
am, 1
and, 1
are, 1
fine, 2
how, 1
i, 1
thank, 2
too, 1
you, 4
  1. 在WebUI控制台查看计算过程
    【Flink】Flink简介及Standalone、Yarn和Kubernetes模式的部署
Yarn 模式

以 Yarn 模式部署 Flink 任务时,要求 Flink 是有 Hadoop 支持的版本,Hadoop环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。

  1. 启动 hadoop 集群
  2. 启动 yarn-session
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d

其中:
-n(–container):TaskManager 的数量。
-s(–slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个taskmanager 的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余。
-jm:JobManager 的内存(单位 MB)。
-tm:每个 taskmanager 的内存(单位 MB)。
-nm:yarn 的 appName(现在 yarn 的 ui 上的名字)。
-d:后台执行。

  1. 执行任务
./flink  run  -m  yarn-cluster  -c  com.imu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar  --input  /applog/flink/input.txt  --output /applog/flink/output5.csv
  1. 去 yarn 控制台8088端口查看任务状态
Kubernetes 部署

容器化部署时目前业界很流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近的版本中支持了 k8s 部署模式。

  1. 搭建 Kubernetes 集群
  2. 配置各组件的 yaml 文件
    在 k8s 上构建 Flink Session Cluster,需要将 Flink 集群的组件对应的 docker 镜像分别在 k8s 上启动,包括 JobManager、TaskManager、JobManagerService 三个镜像服务。每个镜像服务都可以从*镜像仓库中获取。
  3. 启动 Flink Session Cluster
//  启动 jobmanager-service  服务
kubectl create -f jobmanager-service.yaml

//  启动 jobmanager-deployment 服务
kubectl create -f jobmanager-deployment.yaml

//  启动 taskmanager-deployment 服务
kubectl create -f taskmanager-deployment.yaml
  1. 访问 Flink UI 页面
    集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器输入以下 URL 来访问 Flink UI 页面了:
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
相关标签: BigData