欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink

程序员文章站 2022-06-16 17:26:37
...

Flink

1、Flink组件栈

Flink

1.1 Deployment层

主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN)

1.2 Runtime层

Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务

1.3 API层

API层主要实现了面向*Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API 

1.4 Libaries层

在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类
面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作)
面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)

2、架构对比

Apache Flink SparkStreaming Storm
架构 架构介于spark和storm之间,主从结构与spark streaming相似,DataFlow Grpah与Storm相似,数据流可以被表示为一个有向图。 每个顶点是一个用户定义的运算,每向边表示数据的流动。 架构依赖spark,主从模式,每个Batch处理都依赖主(driver),可以理解为时间维度上的spark DAG。 Micro-Batch 主从模式,且依赖ZK,处理过程中对主的依赖不大。
容错 基于Chandy-Lamport distributed snapshots checkpoint机制Medium WAL及RDD 血统机制 High Records ACK Medium
处理模型与延迟 单条事件处理 亚秒级低延迟 一个事件窗口内的所有事件。秒级高延迟 每次传入的一个事件亚秒级低延迟
数据处理保证 exactly once exactly once(实现采用Chandy-Lamport 算法,即marker-checkpoint ) High at least once(实现采用record-level acknowledgments) Medium

3、Flink架构

Flink

3.1 JobManager

Flink系统的协调者,它负责接收Flink Job,调度组成Job的多个Task的执行
收集Job的状态信息,并管理Flink集群中从节点TaskManager

3.2 TaskManager

实际负责执行计算的Worker,在其上执行Flink Job的一组Task
TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报

3.3 Client

用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群

Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的

4、WordCount

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>FlinkInScala</groupId>
    <artifactId>FlinkInScala</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
    </dependencies>

</project>

4.2 Code

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object WordCount {
  def main(args: Array[String]): Unit = {
    val port = {
      try {
        ParameterTool.fromArgs( args ).get( "port" ).toInt
      }catch {
        case e:Exception =>{
          print("port is not set,default use 9000")
        }
        9000
      }
    }
    print(port)
    //1、初始化运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //1.1 添加隐式转换
    //通过隐式转换来增强Scala API的扩展。
    import org.apache.flink.streaming.api.scala._
    //2、读取数据
    val text = env.socketTextStream("mini01",port,'\n')
    text.flatMap(_.split("\\s")) //切分并压平
      .map(x=>WordWithCount(x,1))  //将word转换成WordWithCount对象
      .keyBy("word") //根据key进行聚合
      .timeWindow(Time.seconds(5),Time.seconds(1))  //设置窗口时间
      .reduce((x,y)=>WordWithCount(x.word,x.count+y.count))  //聚合
      .print()  //打印到控制台
      .setParallelism(1)//使用线程数量为单线程
    env.execute() //提交执行任务
  }
  //样例类
  case class WordWithCount(word:String,count:Long)
}

5、Flink集群部署(Standalone)

5.1 部署步骤

1、上传flink-1.6.1-bin-hadoop27-scala_2.11.tgz到服务器
2、解压 tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11
3、修改配置文件 vi flink-1.6.1/conf/flink-conf.yaml
	jobmanager.rpc.address: mini01
4、修改slaves文件,添加从节点
	mini02
	mini03
5、将flink发到其他两台机器上面
	scp -r flink-1.6.1 mini02:/home/newdist/hadoop_apps/
	scp -r flink-1.6.1 mini03:/home/newdist/hadoop_apps/
6、配置环境变量 sudo vi /etc/profile
	export FLINK_HOME=/home/newdist/hadoop_apps/flink-1.6.1
	PATH=$FLINK_HOME/bin:xxxxx
	source /etc/profile
7、启动 flink
	start-cluster.sh
8、访问 mini01:8081

5.2 集群参数

# jobmanager节点可用的内存大小。
jobmanager.heap.size: 1024m

# The heap size for the TaskManager JVM
# taskmanager节点可用的内存代大小。
taskmanager.heap.size: 1024m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
# 每台机器可用的cpu数量
taskmanager.numberOfTaskSlots: 2

# The parallelism used for programs that did not specify and other parallelism.
# 默认情况下任务的并行度
parallelism.default: 1

slot和parallelism总结:
1、slot是静态的概念,是指taskmanager具有的并发执行能力
2、parallelism是动态的概念,是指程序运行实际使用的并发能力
3、设置合适的parallelism来提高运算效率

6、Flink集群部署(Yarn)

Flink

6.1 第一种模式

使用Yarn开辟出一块空间运行Flink,没有任务也会运行

1、配置Hadoop的yarn-site.xml,加入以下内容
	<property>
    	<name>yarn.nodemanager.vmem-check-enable</name>
    	<value>false</value>
	</property>
2、启动Hadoop集群 start-all.sh
3、启动flink on yarn yarn-session.sh -n 2 -jm 1024 -tm 1024 &
4、上传flink下的LICENSE文件
5、运行WordCount例子
	bin/flink run ./examples/batch/WordCount.jar -input hdfs://mini01:9000/LICENSE -output  hdfs://mini01:9000/wordcount.txt
6、如何找到Yarn上的flink1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
去找/tmp/.yarn-properties-hadoop来确定application的ID
- Found Yarn properties file under /tmp/.yarn-properties-hadoop.
- YARN properties set default parallelism to 4

参数:
用法:  
   必选  
     -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)  
   可选  
     -D <arg>                        动态属性  
     -d,--detached                   独立运行  
     -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]  
     -nm,--name                     在YARN上为一个自定义的应用设置一个名字  
     -q,--query                      显示yarn中可用的资源 (内存, cpu核数)  
     -qu,--queue <arg>               指定YARN队列.  
     -s,--slots <arg>                每个TaskManager使用的slots数量  
     -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]  
     -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace 
     -id,--applicationId <yarnAppId>        YARN集群上的任务id,附着到一个后台运行的yarn session中

6.2 第二种模式

1、直接使用启动和运行命令用来运行flink on yarn
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/newdist/hadoop_apps/flink-1.6.1/examples/batch/WordCount.jar
2、原理:提交任务的时候创建新的Application,用来运行程序,如果没有任务就不用创建\

run
run [OPTIONS] <jar-file> <arguments>  
 "run" 操作参数:  
-c,--class <classname>  如果没有在jar包中指定入口类,则需要在这里通过这个参数指定  
-m,--jobmanager <host:port>  指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager  
-p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。
默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
连接指定host和port的jobmanager:
./bin/flink run -m mini01:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 

6.3 进程

ResourceManager
NodeManager
AppMater(jobmanager和它运行在一个Container中)
Container(taskmanager运行在上面)

Flink

7、Flink-HA

7.1 Standalone

#首先按照之前配置 standalone 的参数进行修改 
vi conf/flink-conf.yaml 
jobmanager.rpc.address: hadoop100 
vi conf/slaves 
hadoop101 
hadoop102 

# 然后修改配置 HA 需要的参数 
vi conf/masters
mini01:8081 
mini02:8081 
vi conf/flink-conf.yaml 
high-availability: zookeeper 
high-availability.zookeeper.quorum: mini01:2181,mini02:2181 ,mini03:2181 
# ZooKeeper 节点根目录,其下放置所有集群节点的 
namespace high-availability.zookeeper.path.root: /flink 
# ZooKeeper 节点集群 id,其中放置了集群所需的所有协调数据 
high-availability.cluster-id: /cluster_one 
# 建议指定 hdfs 的全路径。如果某个 flink 节点没有配置 hdfs 的话,不指定全路径无法识别 
# storageDir 存储了恢复一个 JobManager 所需的所有元数据。
high-availability.storageDir: hdfs:///:9000/flink/ha 
# 把 hadoop100 节点上修改好配置的 flink 安装目录拷贝到其他节点 

# 【先启动 zk 服务】 zkServer.sh start 
# 【先启动 hadoop 服务】 start-all.sh
# 启动 flink standalone HA 集群,在 mini01 节点上启动如下命令 start-cluster.sh

7.2 Yarn

首先需要修改 hadoop 中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数 
<property> 
<name>yarn.resourcemanager.am.max-attempts</name> 
<value>4</value> 
</property> 
# 把修改后的配置文件同步到 hadoop 集群的其他节点
然后修改 flink 部分相关配置 可以解压一份新的 flink-1.6.1 安装包 
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz 
修改配置文件【标红的目录名称建议和 standalone HA 中的配置区分开】 
vi conf/flink-conf.yaml high-availability: zookeeper 
high-availability.zookeeper.quorum: mini01:2181 
high-availability.storageDir: hdfs://mini01:9000/flink/ha-yarn 
high-availability.zookeeper.path.root: /flink-yarn 
yarn.application-attempts: 10

8、Flink-Scala-Shell

启动:start-scala-shell.sh local

val text = benv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles")

val count = text.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

count.print