欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink

程序员文章站 2024-01-13 14:09:28
...

sd

http://storm.apache.org/

 

LinkedBlockingQueue

http://docs.oracle.com/javase/7/docs/api/

 

并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

http://www.cnblogs.com/linjiqin/archive/2013/05/30/3108188.html

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。

并行和并发区别

1、并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑;
2、并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率

LinkedBlockingQueue
由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

 

 

流式大数据处理的三种框架:Storm,Spark和Samza

http://www.csdn.net/article/2015-03-09/2824135

许多分布式计算系统都可以实时或接近实时地处理大数据流。本文将对三种Apache框架分别进行简单介绍,然后尝试快速、高度概述其异同。

Apache Storm

Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行。一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由spout发射出的tuple是不可变数组,对应着固定的键值对。

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

Apache Spark

Spark Streaming是核心Spark API的一个扩展,它并不会像Storm那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段的批处理作业。Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集);而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

Apache Samza

Samza处理数据流时,会分别按次处理每条收到的消息。Samza的流单位既不是元组,也不是Dstream,而是一条条消息。在Samza中,数据流被切分开来,每个部分都由一组只读消息的有序数列构成,而这些消息每条都有一个特定的ID(offset)。该系统还支持批处理,即逐次处理同一个数据流分区的多条消息。Samza的执行与数据流模块都是可插拔式的,尽管Samza的特色是依赖Hadoop的Yarn(另一种资源调度器)和Apache Kafka。

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

共同之处

以上三种实时计算系统都是开源的分布式系统,具有低延迟、可扩展和容错性诸多优点,它们的共同特色在于:允许你在运行数据流代码时,将任务分配到一系列具有容错能力的计算机上并行运行。此外,它们都提供了简单的API来简化底层实现的复杂程度。

三种框架的术语名词不同,但是其代表的概念十分相似:

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

对比图

下面表格总结了一些不同之处:

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

数据传递形式分为三大类:

 

  1. 最多一次(At-most-once):消息可能会丢失,这通常是最不理想的结果。
  2. 最少一次(At-least-once):消息可能会再次发送(没有丢失的情况,但是会产生冗余)。在许多用例中已经足够。
  3. 恰好一次(Exactly-once):每条消息都被发送过一次且仅仅一次(没有丢失,没有冗余)。这是最佳情况,尽管很难保证在所有用例中都实现。

 

另一个方面是状态管理:对状态的存储有不同的策略,Spark Streaming将数据写入分布式文件系统中(例如HDFS);Samza使用嵌入式键值存储;而在Storm中,或者将状态管理滚动至应用层面,或者使用更高层面的抽象Trident。

用例

这三种框架在处理连续性的大量实时数据时的表现均出色而高效,那么使用哪一种呢?选择时并没有什么硬性规定,最多就是几个指导方针。

如果你想要的是一个允许增量计算的高速事件处理系统,Storm会是最佳选择。它可以应对你在客户端等待结果的同时,进一步进行分布式计算的需求,使用开箱即用的分布式RPC(DRPC)就可以了。最后但同样重要的原因:Storm使用Apache Thrift,你可以用任何编程语言来编写拓扑结构。如果你需要状态持续,同时/或者达到恰好一次的传递效果,应当看看更高层面的Trdent API,它同时也提供了微批处理的方式。

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

 

使用Storm的公司有:Twitter,雅虎,Spotify还有The Weather Channel等。

说到微批处理,如果你必须有状态的计算,恰好一次的递送,并且不介意高延迟的话,那么可以考虑Spark Streaming,特别如果你还计划图形操作、机器学习或者访问SQL的话,Apache Spark的stack允许你将一些library与数据流相结合(Spark SQL,Mllib,GraphX),它们会提供便捷的一体化编程模型。尤其是数据流算法(例如:K均值流媒体)允许Spark实时决策的促进。

使用Spark的公司有:亚马逊,雅虎,NASA JPLeBay还有百度等。

如果你有大量的状态需要处理,比如每个分区都有许多十亿位元组,那么可以选择Samza。由于Samza将存储与处理放在同一台机器上,在保持处理高效的同时,还不会额外载入内存。这种框架提供了灵活的可插拔API:它的默认execution、消息发送还有存储引擎操作都可以根据你的选择随时进行替换。此外,如果你有大量的数据流处理阶段,且分别来自不同代码库的不同团队,那么Samza的细颗粒工作特性会尤其适用,因为它们可以在影响最小化的前提下完成增加或移除的工作。

使用Samza的公司有:LinkedInIntuitMetamarketsQuantiplyFortscale等。

结论

本文中我们只对这三种Apache框架进行了简单的了解,并未覆盖到这些框架中大量的功能与更多细微的差异。同时,文中这三种框架对比也是受到限制的,因为这些框架都在一直不断的发展,这一点是我们应当牢记的。

原文链接:Streaming Big Data: Storm, Spark and Samza (编译/孙薇 责编/周建丁 )

 

业务场景:风控大数据指标计算-如:黄牛党分析

问题案例1:apache storm OOM内存溢出,内存对象发送给对方没有ack确认,不断堆积导致JVM定期重启

参考案例:

Severe memory leak to OOM when ackers disabled

https://issues.apache.org/jira/browse/STORM-339

初步分析:

jboss javacore 分析

http://dl2.iteye.com/upload/attachment/0122/3260/3c64b265-7064-37dd-b10e-eae56dc21a39.png

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

jboss heapdump分析工具http://www.eclipse.org/mat/下载,分析jboss生成的3716.hprof文件,MAT工具打不开,调整如下内存-Xmx4g即可。

C:\Users\Lindows\Desktop\dev\MemoryAnalyzer-1.5.0.20150527-win32.win32.x86_64\MemoryAnalyzer.ini

 

-startup
plugins/org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
--launcher.library
plugins/org.eclipse.equinox.launcher.win32.win32.x86_64_1.1.300.v20150602-1417
-vmargs
-Xmx4g
http://dl2.iteye.com/upload/attachment/0122/3192/3bd28a3a-2a77-3ed8-8807-27fe74d8092b.png
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE   

追踪分析:

apache_storm_storm-core-0.9.1-incubating-sources.jar_Config.class.png

D:/mvn_repository/org/apache/storm/storm-core/0.9.1-incubating/storm-core-0.9.1-incubating-sources.jar

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  

apache_storm_storm-core-0.9.1-incubating-sources.jar_MainTest.java.png

JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE   

 

新一代大数据处理引擎 Apache Flink

https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/ 

大数据计算引擎的发展

这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop、Storm,以及后来的 Spark,他们都有着各自专注的应用场景。Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其他分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。

在国外一些社区,有很多人将大数据的计算引擎分成了 4 代,当然,也有很多人不会认同。我们先姑且这么认为和讨论。

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。这里大家应该都不会对 MapReduce 陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

或许会有人不同意以上的分类,我觉得其实这并不重要的,重要的是体会各个框架的差异,以及更适合的场景。并进行理解,没有哪一个框架可以完美的支持所有的场景,也就不可能有任何一个框架能完全取代另一个,就像 Spark 没有完全取代 Hadoop,当然 Flink 也不可能取代 Spark。本文将致力描述 Flink 的原理以及应用。

Flink 简介

很多人可能都是在 2015 年才听到 Flink 这个词,其实早在 2008 年,Flink 的前身已经是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,然后迅速地成为了 ASF(Apache Software Foundation)的*项目之一。Flink 的最新版本目前已经更新到了 0.10.0 了,在很多人感慨 Spark 的快速发展的同时,或许我们也该为 Flink 的发展速度点个赞。

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。并且 Flink 可以定制化内存管理。在这点,如果要对比 Flink 和 Spark 的话,Flink 并没有将内存完全交给应用层。这也是为什么 Spark 相对于 Flink,更容易出现 OOM 的原因(out of memory)。就框架本身与应用场景来说,Flink 更相似与 Storm。如果之前了解过 Storm 或者 Flume 的读者,可能会更容易理解 Flink 的架构和很多概念。下面让我们先来看下 Flink 的架构图。

图 1. Flink 架构图
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

如图 1 所示,我们可以了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行,然后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构图去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。然而有一个最重要的区别就是 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,可能是很多级,并且在 TaskManager 内部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。

Flink 中的调度简述

在 Flink 集群中,计算资源被定义为 Task Slot。每个 TaskManager 会拥有一个或多个 Slots。JobManager 会以 Slot 为单位调度 Task。但是这里的 Task 跟我们在 Hadoop 中的理解是有区别的。对 Flink 的 JobManager 来说,其调度的是一个 Pipeline 的 Task,而不是一个点。举个例子,在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,并且都会去占用计算资源。对 Flink 来说 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源。类同的,如果有一个 MRR 的 Pipeline Task,在 Flink 中其也是一个被整体调度的 Pipeline Task。在 TaskManager 中,根据其所拥有的 Slot 个数,同时会拥有多个 Pipeline。

在 Flink StandAlone 的部署模式中,这个还比较容易理解。因为 Flink 自身也需要简单的管理计算资源(Slot)。当 Flink 部署在 Yarn 上面之后,Flink 并没有弱化资源管理。也就是说这时候的 Flink 在做一些 Yarn 该做的事情。从设计角度来讲,我认为这是不太合理的。如果 Yarn 的 Container 无法完全隔离 CPU 资源,这时候对 Flink 的 TaskManager 配置多个 Slot,应该会出现资源不公平利用的现象。Flink 如果想在数据中心更好的与其他计算框架共享计算资源,应该尽量不要干预计算资源的分配和定义。

需要深度学习 Flink 调度读者,可以在 Flink 的源码目录中找到 flink-runtime 这个文件夹,JobManager 的 code 基本都在这里。

Flink 的生态圈

一个计算框架要有长远的发展,必须打造一个完整的 Stack。不然就跟纸上谈兵一样,没有任何意义。只有上层有了具体的应用,并能很好的发挥计算框架本身的优势,那么这个计算框架才能吸引更多的资源,才会更快的进步。所以 Flink 也在努力构建自己的 Stack。

Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中。Flink 通过 Gelly 支持了图操作,还有机器学习的 FlinkML。Table 是一种接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和执行。对于完整的 Stack 我们可以参考下图。

图 2. Flink 的 Stack
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

Flink 为了更广泛的支持大数据的生态圈,其下也实现了很多 Connector 的子项目。最熟悉的,当然就是与 Hadoop HDFS 集成。其次,Flink 也宣布支持了 Tachyon、S3 以及 MapRFS。不过对于 Tachyon 以及 S3 的支持,都是通过 Hadoop HDFS 这层包装实现的,也就是说要使用 Tachyon 和 S3,就必须有 Hadoop,而且要更改 Hadoop 的配置(core-site.xml)。如果浏览 Flink 的代码目录,我们就会看到更多 Connector 项目,例如 Flume 和 Kafka。

Flink 的部署

Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介绍下这两种模式。

Standalone 模式

在搭建 Standalone 模式的 Flink 集群之前,我们需要先下载 Flink 安装包。这里我们需要下载 Flink 针对 Hadoop 1.x 的包。下载并解压后,进到 Flink 的根目录,然后查看 conf 文件夹,如下图。

图 3. Flink 的目录结构
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

我们需要指定 Master 和 Worker。Master 机器会启动 JobManager,Worker 则会启动 TaskManager。因此,我们需要修改 conf 目录中的 master 和 slaves。在配置 master 文件时,需要指定 JobManager 的 UI 监听端口。一般情况下,JobManager 只需配置一个,Worker 则须配置一个或多个(以行为单位)。示例如下:

1
2
3
4
5
micledeMacBook-Pro:conf micle$ cat masters
localhost:8081
 
micledeMacBook-Pro:conf micle$ cat slaves
localhost

在 conf 目录中找到文件 flink-conf.yaml。在这个文件中定义了 Flink 各个模块的基本属性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考虑 HA 的情况下,一般只需要修改属性 taskmanager.numberOfTaskSlots,也就是每个 Task Manager 所拥有的 Slot 个数。这个属性,一般设置成机器 CPU 的 core 数,用来平衡机器之间的运算性能。其默认值为 1。配置完成后,使用下图中的命令启动 JobManager 和 TaskManager(启动之前,需要确认 Java 的环境是否已经就绪)。

图 4. 启动 StandAlone 模式的 Flink
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

启动之后我们就可以登陆 Flink 的 GUI 页面。在页面中我们可以看到 Flink 集群的基本属性,在 JobManager 和 TaskManager 的页面中,可以看到这两个模块的属性。目前 Flink 的 GUI,只提供了简单的查看功能,无法动态修改配置属性。一般在企业级应用中,这是很难被接受的。因此,一个企业真正要应用 Flink 的话,估计也不得不加强 WEB 的功能。

图 5. Flink 的 GUI 页面
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

Yarn Cluster 模式

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们通过下图了解下 Yarn 和 Flink 的关系。

图 6. Flink 与 Yarn 的关系
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。

了解了 Flink 与 Yarn 的关系,我们就简单看下部署的步骤。在这之前需要先部署好 Yarn 的集群,这里我就不做介绍了。我们可以通过以下的命令查看 Yarn 中现有的 Application,并且来检查 Yarn 的状态。

1
yarn application –list

如果命令正确返回了,就说明 Yarn 的 RM 目前已经在启动状态。针对不同的 Yarn 版本,Flink 有不同的安装包。我们可以在 Apache Flink 的下载页中找到对应的安装包。我的 Yarn 版本为 2.7.1。再介绍具体的步骤之前,我们需要先了解 Flink 有两种在 Yarn 上面的运行模式。一种是让 Yarn 直接启动 JobManager 和 TaskManager,另一种是在运行 Flink Workload 的时候启动 Flink 的模块。前者相当于让 Flink 的模块处于 Standby 的状态。这里,我也主要介绍下前者。

在下载和解压 Flink 的安装包之后,需要在环境中增加环境变量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目录。如运行下面的命令:

1
export HADOOP_CONF_DIR=/etc/hadoop/conf

这是因为 Flink 实现了 Yarn 的 Client,因此需要 Yarn 的一些配置和 Jar 包。在配置好环境变量后,只需简单的运行如下的脚本,Yarn 就会启动 Flink 的 JobManager 和 TaskManager。

1
yarn-session.sh –d –s 2 –tm 800 –n 2

上面的命令的意思是,向 Yarn 申请 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存。在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录。如下图。

图 7. Flink on Yarn
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

如果有些读者在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink 向 Yarn 会申请多个 Container,但是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图 8。

图 8. Flink 的页面
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

对于 Flink 安装时的 Trouble-shooting,可能更多时候需要查看 Yarn 相关的 log 来分析。这里就不多做介绍,读者可以到 Yarn 相关的描述中查找。

Flink 的 HA

对于一个企业级的应用,稳定性是首要要考虑的问题,然后才是性能,因此 HA 机制是必不可少的。另外,对于已经了解 Flink 架构的读者,可能会更担心 Flink 架构背后的单点问题。和 Hadoop 一代一样,从架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。 JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。Flink 对 JobManager HA 的处理方式,原理上基本和 Hadoop 一样(一代和二代)。

首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。

对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。

Flink 的 Rest API 介绍

Flink 和其他大多开源的框架一样,提供了很多有用的 Rest API。不过 Flink 的 RestAPI,目前还不是很强大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上,可以比较容易的将 Flink 的 Monitor 功能和其他第三方工具相集成,这也是其设计的初衷。

在 Flink 的进程中,是由 JobManager 来提供 Rest API 的服务。因此在调用 Rest 之前,要确定 JobManager 是否处于正常的状态。正常情况下,在发送一个 Rest 请求给 JobManager 之后,Client 就会收到一个 JSON 格式的返回结果。由于目前 Rest 提供的功能还不多,需要增强这块功能的读者可以在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor,就是用来对所有的 Rest 请求做分流的,如果需要添加一个新类型的请求,就需要在这里增加对应的处理代码。下面我例举几个常用 Rest API。

1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果如下:

1
2
$ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查询当前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回结果如下:

1
2
$ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容,这是我在浏览器中进行的测试,如下图:

图 9. Rest 查询具体的 Job 信息
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
 

想要了解更多 Rest 请求内容的读者,可以去 Apache Flink 的页面中查找。由于篇幅有限,这里就不一一列举。

运行 Flink 的 Workload

WordCount 的例子,就像是计算框架的 helloworld。这里我就以 WordCount 为例,介绍下如何在 Flink 中运行 workload。

在安装好 Flink 的环境中,找到 Flink 的目录。然后找到 bin/flink,它就是用来提交 Flink workload 的工具。对于 WordCount,我们可以直接使用已有的示例 jar 包。如运行如下的命令:

1
./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out

上面的命令是在 HDFS 中运行 WordCount,如果没有 HDFS 用本地的文件系统也是可以的,只需要将“hdfs://”换成“file://”。这里我们需要强调一种部署关系,就是 StandAlone 模式的 Flink,也是可以直接访问 HDFS 等分布式文件系统的。

结束语

Flink 是一个比 Spark 起步晚的项目,但是并不代表 Flink 的前途就会暗淡。Flink 和 Spark 有很多类似之处,但也有很多明显的差异。本文并没有比较这两者之间的差异,这是未来我想与大家探讨的。例如 Flink 如何更高效的管理内存,如何进一步的避免用户程序的 OOM。在 Flink 的世界里一切都是流,它更专注处理流应用。由于其起步晚,加上社区的活跃度并没有 Spark 那么热,所以其在一些细节的场景支持上,并没有 Spark 那么完善。例如目前在 SQL 的支持上并没有 Spark 那么平滑。在企业级应用中,Spark 已经开始落地,而 Flink 可能还需要一段时间的打磨。在后续文章中,我会详细介绍如何开发 Flink 的程序,以及更多有关 Flink 内部实现的内容。 

 

end

 

  • JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
  • 大小: 250.9 KB
  • JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
  • 大小: 122.9 KB
  • JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
  • 大小: 193 KB
  • JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
            
    
    博客分类: Java EE  
  • 大小: 255.8 KB