Flink
程序员文章站
2022-06-16 17:26:37
...
文章目录
Flink
1、Flink组件栈
1.1 Deployment层
主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN)
1.2 Runtime层
Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务
1.3 API层
API层主要实现了面向*Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API
1.4 Libaries层
在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类
面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作)
面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)
2、架构对比
Apache | Flink | SparkStreaming | Storm |
---|---|---|---|
架构 | 架构介于spark和storm之间,主从结构与spark streaming相似,DataFlow Grpah与Storm相似,数据流可以被表示为一个有向图。 每个顶点是一个用户定义的运算,每向边表示数据的流动。 | 架构依赖spark,主从模式,每个Batch处理都依赖主(driver),可以理解为时间维度上的spark DAG。 Micro-Batch | 主从模式,且依赖ZK,处理过程中对主的依赖不大。 |
容错 | 基于Chandy-Lamport distributed snapshots checkpoint机制Medium | WAL及RDD 血统机制 High | Records ACK Medium |
处理模型与延迟 | 单条事件处理 亚秒级低延迟 | 一个事件窗口内的所有事件。秒级高延迟 | 每次传入的一个事件亚秒级低延迟 |
数据处理保证 | exactly once | exactly once(实现采用Chandy-Lamport 算法,即marker-checkpoint ) High | at least once(实现采用record-level acknowledgments) Medium |
3、Flink架构
3.1 JobManager
Flink系统的协调者,它负责接收Flink Job,调度组成Job的多个Task的执行
收集Job的状态信息,并管理Flink集群中从节点TaskManager
3.2 TaskManager
实际负责执行计算的Worker,在其上执行Flink Job的一组Task
TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报
3.3 Client
用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群
Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的
4、WordCount
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FlinkInScala</groupId>
<artifactId>FlinkInScala</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
</project>
4.2 Code
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object WordCount {
def main(args: Array[String]): Unit = {
val port = {
try {
ParameterTool.fromArgs( args ).get( "port" ).toInt
}catch {
case e:Exception =>{
print("port is not set,default use 9000")
}
9000
}
}
print(port)
//1、初始化运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.1 添加隐式转换
//通过隐式转换来增强Scala API的扩展。
import org.apache.flink.streaming.api.scala._
//2、读取数据
val text = env.socketTextStream("mini01",port,'\n')
text.flatMap(_.split("\\s")) //切分并压平
.map(x=>WordWithCount(x,1)) //将word转换成WordWithCount对象
.keyBy("word") //根据key进行聚合
.timeWindow(Time.seconds(5),Time.seconds(1)) //设置窗口时间
.reduce((x,y)=>WordWithCount(x.word,x.count+y.count)) //聚合
.print() //打印到控制台
.setParallelism(1)//使用线程数量为单线程
env.execute() //提交执行任务
}
//样例类
case class WordWithCount(word:String,count:Long)
}
5、Flink集群部署(Standalone)
5.1 部署步骤
1、上传flink-1.6.1-bin-hadoop27-scala_2.11.tgz到服务器
2、解压 tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11
3、修改配置文件 vi flink-1.6.1/conf/flink-conf.yaml
jobmanager.rpc.address: mini01
4、修改slaves文件,添加从节点
mini02
mini03
5、将flink发到其他两台机器上面
scp -r flink-1.6.1 mini02:/home/newdist/hadoop_apps/
scp -r flink-1.6.1 mini03:/home/newdist/hadoop_apps/
6、配置环境变量 sudo vi /etc/profile
export FLINK_HOME=/home/newdist/hadoop_apps/flink-1.6.1
PATH=$FLINK_HOME/bin:xxxxx
source /etc/profile
7、启动 flink
start-cluster.sh
8、访问 mini01:8081
5.2 集群参数
# jobmanager节点可用的内存大小。
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
# taskmanager节点可用的内存代大小。
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
# 每台机器可用的cpu数量
taskmanager.numberOfTaskSlots: 2
# The parallelism used for programs that did not specify and other parallelism.
# 默认情况下任务的并行度
parallelism.default: 1
slot和parallelism总结:
1、slot是静态的概念,是指taskmanager具有的并发执行能力
2、parallelism是动态的概念,是指程序运行实际使用的并发能力
3、设置合适的parallelism来提高运算效率
6、Flink集群部署(Yarn)
6.1 第一种模式
使用Yarn开辟出一块空间运行Flink,没有任务也会运行
1、配置Hadoop的yarn-site.xml,加入以下内容
<property>
<name>yarn.nodemanager.vmem-check-enable</name>
<value>false</value>
</property>
2、启动Hadoop集群 start-all.sh
3、启动flink on yarn yarn-session.sh -n 2 -jm 1024 -tm 1024 &
4、上传flink下的LICENSE文件
5、运行WordCount例子
bin/flink run ./examples/batch/WordCount.jar -input hdfs://mini01:9000/LICENSE -output hdfs://mini01:9000/wordcount.txt
6、如何找到Yarn上的flink1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
去找/tmp/.yarn-properties-hadoop来确定application的ID
- Found Yarn properties file under /tmp/.yarn-properties-hadoop.
- YARN properties set default parallelism to 4
参数:
用法:
必选
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
可选
-D <arg> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
6.2 第二种模式
1、直接使用启动和运行命令用来运行flink on yarn
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/newdist/hadoop_apps/flink-1.6.1/examples/batch/WordCount.jar
2、原理:提交任务的时候创建新的Application,用来运行程序,如果没有任务就不用创建\
run
run [OPTIONS] <jar-file> <arguments>
"run" 操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。
默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
连接指定host和port的jobmanager:
./bin/flink run -m mini01:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
6.3 进程
ResourceManager
NodeManager
AppMater(jobmanager和它运行在一个Container中)
Container(taskmanager运行在上面)
7、Flink-HA
7.1 Standalone
#首先按照之前配置 standalone 的参数进行修改
vi conf/flink-conf.yaml
jobmanager.rpc.address: hadoop100
vi conf/slaves
hadoop101
hadoop102
# 然后修改配置 HA 需要的参数
vi conf/masters
mini01:8081
mini02:8081
vi conf/flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: mini01:2181,mini02:2181 ,mini03:2181
# ZooKeeper 节点根目录,其下放置所有集群节点的
namespace high-availability.zookeeper.path.root: /flink
# ZooKeeper 节点集群 id,其中放置了集群所需的所有协调数据
high-availability.cluster-id: /cluster_one
# 建议指定 hdfs 的全路径。如果某个 flink 节点没有配置 hdfs 的话,不指定全路径无法识别
# storageDir 存储了恢复一个 JobManager 所需的所有元数据。
high-availability.storageDir: hdfs:///:9000/flink/ha
# 把 hadoop100 节点上修改好配置的 flink 安装目录拷贝到其他节点
# 【先启动 zk 服务】 zkServer.sh start
# 【先启动 hadoop 服务】 start-all.sh
# 启动 flink standalone HA 集群,在 mini01 节点上启动如下命令 start-cluster.sh
7.2 Yarn
首先需要修改 hadoop 中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
# 把修改后的配置文件同步到 hadoop 集群的其他节点
然后修改 flink 部分相关配置 可以解压一份新的 flink-1.6.1 安装包
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
修改配置文件【标红的目录名称建议和 standalone HA 中的配置区分开】
vi conf/flink-conf.yaml high-availability: zookeeper
high-availability.zookeeper.quorum: mini01:2181
high-availability.storageDir: hdfs://mini01:9000/flink/ha-yarn
high-availability.zookeeper.path.root: /flink-yarn
yarn.application-attempts: 10
8、Flink-Scala-Shell
启动:start-scala-shell.sh local
val text = benv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles")
val count = text.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
count.print
上一篇: JAVA 注解之自定义注解
下一篇: 第1节-深入理解Tomcat