Strom入门篇
数据的生成可以看作一连串发生的离散事件,这些事件流会伴随着不同的数据流、操作和分析,都会由一个通用的软件框架和基础设施来处理。
Storm 正是最流行的实时流计算框架之一,它提供了可容错分布式计算所要求的基本原语和保障机制,可以满足大容量关键业务应用的需求。它不但是一套技术的整合,也是一种数据流和控制的机制。很多大公司都将 Storm 作为大数据处理平台的核心部分,它具有每天或者每小时处理上亿次事务的能力。
Storm topology 的组成部分——stream、spout 和 bolt
Strom分布式计算结构成为topology(拓扑),由stream(数据流),spout(数据量的生成者),bolt(运算)组成。如图所示:
- stream:Storm 的核心数据结构是 tuple。tuple是包含了一个或者多个键值对的列表,Stream 是由无限制的 tuple 组成的序列。
- spout:spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。因为 spout 通常不会用来实现业务逻辑,所以在多个 topology 中常常可以复用。
- bolt:bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。bolt 可以订阅多个由 spout 或者其他bolt 发射的数据流,这样就可以建立复杂的数据流转换网络。像 Spout API 一样,bolt 可以执行各式各样的处理功能,bolt 的编程接口简单明了,bolt 可以执行的典型功能包括:
● 过滤 tuple
● 连接(join)和聚合操作(aggregation)
● 计算
● 数据库读写
Storm 的并发机制
在 Storm 的间接中提到过,Storm 计算支持在多台机器上水平扩容,通过将计算切分为多个独立的 tasks 在集群上并发执行来实现。在 Storm 中,一个 task 可以简单地理解为在集群某节点上运行的一个 spout 或者 bolt 实例。
为了理解storm 的并发机制是如何运行的,我们先来解释下在集群中运行的topology 的四个主要组成部分:
● Nodes (服务器):指配置在一个Storm 集群中的服务器,会执行topology 的一部分 运算。一个Storm 集群可以包括一个或者多个工作node。
● Workers (JVM 虚拟机):指一个node 上相互独立运行的JVM 进程。每个node 可以配置运行一个或者多个worker。一个topology 会分配到一个或者多个worker 上 运行。
● Executeor (线程):指一个worker 的jvm 进程中运行的Java 线程。多个task 可以指派给同一个executer 来执行。除非是明确指定,Storm 默认会给每个executor 分配一个task。
● Task (bolt/spout 实 例):task 是spout 和bolt 的 实 例, 它 们 的nextTuple() 和execute() 方法会被executors 线程调用执行。
要重点指出的是,当topology 执行在本地模式时,增加worker 的数量不会达到提 高速度的效果。因为topology 在本地模式下是在同一个JVM 进程中执行的,所以只有 增加task 和executor 的并发度配置才会生效。Storm 的本地模式提供了接近集群模式的 模拟,对本地开发有帮助。但程序在投入生产环境之前,必须在真实的集群环境下进行 测试。
理解数据流分组
Storm 定义了七种内置数据流分组的方式:
● Shuffle grouping (随机分组):这种方式会随机分发tuple 给bolt 的各个task ,每个 bolt 实例接收到的相同数量的tuple。
● Fields grouping (按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“word ”字段进行分组,所有具有相同“word ”字段值的tuple 会路由到同一 个bolt 的task 中。
● All grouping (全复制分组):将所有的tuple 复制后分发给所有bolt task。每个订阅数据流的task 都会接收到tuple 的拷贝。
● Globle grouping(全局分组):这种分组方式将所有的tuples 路由到唯一一个task 上。Storm 按照最小的task ID 来选取接收数据的task。注意,当使用全局分组方式时, 设置bolt 的task 并发度是没有意义的,因为所有tuple 都转发到同一个task 上了。使用全局分组的时候需要注意,因为所有的tuple 都转发到一个JVM 实例上,可 能会引起Storm 集群中某个JVM 或者服务器出现性能瓶颈或崩溃。
● None grouping (不分组):在功能上和随机分组相同,是为将来预留的。
● Direct grouping (指向型分组):数据源会调用emitDirect() 方法来判断一个tuple 应该由哪个Storm 组件来接收。只能在声明了是指向型的数据流上使用。
● Local or shuffle grouping (本地或随机分组):和随机分组类似,但是,会将tuple 分 发给同一个worker 内的bolt task (如果worker 内有接收数据的bolt task)。其他情况下,采用随机分组的方式。取决于topology 的并发度,本地或随机分组可以减 少网络传输,从而提高topology 性能。
- 除了预定义好的分组方式之外,还可以通过实现CustomStreamGrouping (自定义分组)
有保障机制的数据处理
Storm 提供了一种API 能够保证spout 发送出来的每个tuple 都能够执行完整的处理过程。在我们上面的例子中,不担心执行失败的情况。可以看到在一个topology 中一个spout 的数据流会被分割生成任意多的数据流,取决于下游bolt 的行为。如果发生了执行 失败会怎样?举个例子,考虑一个负责将数据持久化到数据库的bolt。怎样处理数据库更 新失败的情况?
- spout的可靠性
在Storm 中,可靠的消息处理机制是从spout 开始的。一个提供了可靠的处理机制的 spout 需要记录它发射出去的tuple ,当下游bolt 处理tuple 或者子tuple 失败时spout 能够重新发射。子tuple 可以理解为bolt 处理spout 发射的原始tuple 后,作为结果发射出去的tuple。另外一个视角来看,可以将spout 发射的数据流看作一个tuple 树的主干,如图所示:
在图中,实线部分表示从spout 发射的原始主干tuple ,虚线部分表示的子tuple 都 是源自于原始tuple。这样产生的图形叫做tuple 树。在有保障数据的处理过程中,bolt每收到一个tuple ,都需要向上游确认应答(ack)者报错。对主干tuple 中的一个tuple , 如果tuple 树上的每个bolt 进行了确认应答,spout 会调用ack 方法来标明这条消息已经 完全处理了。如果树中任何一个bolt 处理tuple 报错,或者处理超时,spout 会调用fail 方法。
Storm 的ISpout 接口定义了三个可靠性相关的API :nextTuple ,ack 和fail。
前面讲过,Storm 通过调用Spout 的nextTuple() 发送一个tuple。为实现可靠的消息处理,首先要给每个发出的tuple 带上唯一的ID, 并且将ID 作为参数传递给SpoutOutputCollector的emit() 方法:
给tuple 指定ID 告诉Storm 系统,无论执行成功还是失败,spout 都要接收tuple 树上 所有节点返回的通知。如果处理成功,spout 的ack() 方法将会对编号是ID 的消息应答确 认,如果执行失败或者超时,会调用fail() 方法。
2.bolt的可靠性
bolt 要实现可靠的消息处理机制包含两个步骤:
1. 当发射衍生的tuple 时,需要锚定读入的tuple
2. 当处理消息成功或者失败时分别确认应答或者报错
锚定一个tuple 的意思是,建立读入tuple 和衍生出的tuple 之间的对应关系,这样下 游的bolt 就可以通过应答确认、报错或超时来加入到tuple 树结构中。 可以通过调用OutputCollector 中emit() 的一个重载函数锚定一个或者一组tuple :
collerctor.emit(tuple,new Values(words));
这里,我们将读入的tuple 和发射的新tuple 锚定起来,下游的bolt 就需要对输出的 tuple 进行确认应答或者报错。另外一个emit() 方法会发射非锚定的tuple :
collector.emit(new Values(words));
非锚定的tuple 不会对数据流的可靠性起作用。如果一个非锚定的tuple 在下游处理失 败,原始的根tuple 不会重新发送。
当处理完成或者发送了新tuple 之后,可靠数据流中的bolt 需要应答读入的tuple :
this.collector.ack(tuple);
如果处理失败,这样的话spout 必须发射tuple ,bolt 就要明确地对处理失败的tuple 报错:
this.collector.fail(tuple);
如果因为超时的原因,或者显式调用OutputCollector.fail() 方法,spout 都会重新发送 原始tuple。
分布式单词计数器案例代码地址:github案例代码例子
接下来将分享配置strom集群,不要错过噢~