Storm入门
前言
只有光头才能变强。
文本已收录至我的github精选文章,欢迎star:https://github.com/zhongfucheng3y/3y
听说过大数据的同学应该都听说过storm吧?其实我现在负责的系统用的就是storm,在最开始接手系统的时候,我是完全不了解storm的(现在其实也是一知半解而已)
由于最近在整理系统,所以顺便花了点时间入门了一下storm(前几天花了点时间改了一下,上线以后一堆bug,于是就果断回滚了。)
这篇文章来讲讲简单storm的简单使用,没有复杂的东西。看完这篇文章,等到接手storm的代码的时候你们**『大概』『应该』**能看懂storm的代码。
什么是storm
我们首先进官方看一下storm的介绍:
apache storm is a free and open source distributed realtime computation system
storm是一个分布式的实时计算系统。
分布式:我在之前已经写过挺多的分布式的系统了,比如kafka/hdfs/elasticsearch等等。现在看到分布式这个词,三歪第一反应就是「它的存储或者计算交由多台服务器上完成,最后汇总起来达到最终的效果」。
实时:处理速度是毫秒级或者秒级的
计算:可以简单理解为对数据进行处理,比如清洗数据(对数据进行规整,取出有用的数据)。
我们使用storm做了什么?
我现在做的消息管理平台是可以推送各类的消息的(im/push/短信/微信消息等等),消息下发后,我们是肯定要知道这条消息的下发情况的(是否发送成功,如果用户没收到是由于什么原因导致用户没收到,消息是否被点击了等等)。
消息是否成功下发到用户上,这是运营和客服经常关心的问题。
消息下发的效果,这是运营非常关心的问题
基于上面问题,我们用了storm做了一套自己的埋点方案,帮助我们快速确认消息是否成功下发到用户上以及统计消息下发的效果。
听起来好像很牛逼,下面我来讲讲背景,看完你就会发现一点儿都不难。
需求背景
消息管理平台虽然看起来只是发消息的,但是系统设计还是有点东西的。我们以「微服务」的思想去看这个系统,会将不同的功能模块抽取到不同的系统的。
其中push(推送)的链路是最长的,一条消息下发经过的后端系统就有7个,如图下:
这7个系统都有可能「干掉」了这条消息,导致用户没收到。如果我们每去查一个问题,都要逐一排查每个系统,那实在是太慢了。
很多时候客服反馈过来的问题都是当天的,甚至是前几分钟的,我们需要有一个及时的反馈给客服来帮助用户找到为什么收不到消息的原因。
于是我们要做两个功能:
能够查询用户当天所有的消息下发情况。(能够快速定位是哪个系统什么原因导致用户收不到消息) 查询某条消息的实时整体下发情况。(能够快速查看该消息的整体下发情况,包括下发量,中途过滤的量以及点击量)
如果是单纯查问题,我们将各个系统的日志收集到kafka,然后写到elasticsearch这个是完全没问题的(现在我们也是这么干的)
涉及到统计相关的,我们就有自己的一套埋点方案,这个是便于对数据的统计,也能完成部分排查的功能。
需求实现
前面提到了「埋点」,实际上就是打日志。其实就是在关键的地方上打上日志做记录,方便排查问题。
比如,现在我们有7个系统,每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。
这些「关键位置」我们都给它用简单的数字来命个名。比如说:我们用「11」来代表这个用户没有绑定手机号,用「12」来代表这个用户10分钟前收到了一条一模一样的消息,用「13」来代表这个用户屏蔽了消息.....
「11」「12」「13」「14」「15」「16」这些就叫做「点位」,把这些点位在关键的位置中打上日志,这个就叫做「埋点」
有了埋点,我们要做的就是将这些点位收集起来,然后统一处理成我们的格式,输出到数据源中。
ok,就是分三步:
收集日志 清洗日志 输出到数据源
收集日志我们有logagent帮我们收集到kafka,实时清洗日志我们用的就是storm,清洗完我们输出到redis(实时)/hive(离线)。
storm一般是在处理(清洗)那层,storm的上下游也很明确了(上游是消息队列,下游写到各种数据源,这种是最常见的):
storm统一清洗出来放到redis,我们就可以通过接口来很方便去查一条消息的整体下发情况,比如:
到这里,主要想说明我们通过storm来实时清洗数据,下来来讲讲storm的基本使用~
storm入门
我们从一段最简单的storm代码入门,先看看下面的代码:
如果完全没看过storm代码的同学,看到上面的代码会怎么分析?我是这样的:
首先有一个topologybuilder的东西,这个东西可能是storm的构造器之类的 然后设置了spout和bolt(但是我不知道这两个东西是用来干嘛的,但是我可以点进去对象里边看看做了什么) 然后设置了一下config配置(应该是设置storm分配多少内存,多少线程之类的,反正跟配置相关) 最后用stormsubmitter提交任务,把配置和topologybuilder的内容给提交上去。
我们简单搜一下,就可以发现它的流程大致是这样的:
spout是数据的源头,一般我们用它去接收数据,spout接收到数据后往bolt上发送,bolt处理数据(清洗)。bolt清洗完数据可以写到一个数据源或者传递给下一个bolt继续清洗。
topology关联了我们在程序中定义好的spout和bolt。各种 spout 和 bolt 连接在一起之后,就成了一个 topology,一个 topology 就是一个 storm 应用。
spout往bolt传递数据,bolt往bolt传递数据,这个传递的过程叫做stream,stream传递的是一个一个tuple。
现在问题来了,我们的spout和bolt之间是怎么关联起来的呢?bolt和bolt之间是怎么关联起来的呢?
在上面的图我们知道一个topology会有多个spout和多个bolt,那我怎么知道这个spout传递的数据是给这个bolt,这个bolt传递的数据是给另外一个bolt?(说白了,就是上面图上的箭头是怎么关联的呢?)
在storm中,有grouping的机制,就是决定spout的数据流向哪个bolt,bolt的数据流向下一个bolt。
为了提高并发度,我们在setbolt的时候,可以指定bolt的线程数,也就是所谓的executor(spout也同样可以指定线程数的,只是这次我拿bolt来举例)。我们的结构可能会是这样的:
分组的策略有以下:
1)shufflegrouping(随机分组) 2)fieldsgrouping(按照字段分组,在这里即是同一个单词只能发送给一个bolt) 3)allgrouping(广播发送,即每一个tuple,每一个bolt都会收到) 4)globalgrouping(全局分组,将tuple分配到task id值最低的task里面) 5)nonegrouping(随机分派) 6)directgrouping(直接分组,指定tuple与bolt的对应发送关系) 7)local or shuffle grouping 8)partialkeygrouping(关键字分组,与按字段分组很相似,但他分配更加均衡) 9)customgrouping (自定义的grouping)
shufflegrouping策略我们是用得最多的,比如上面的图上有两个spout,我们会将这两个spout的tuple均匀分发到各个bolt中执行。
说到这里,我们再回头看看最开始的代码,我给补充一下注释,你们应该就能看得懂了:
我还是再画一个图吧:
入门的过程复杂吗?不复杂。说白了就是spout接收到数据,通过grouping机制将spout的数据传到给bolt处理,bolt处理完看还需不需要继续往下处理,如果需要就传递给下一个bolt,不需要就写到数据源、调接口等等。
storm架构
当我们提交任务之后,会发生什么呢?我们来看看。
任务提交后,会被上传到nimbus节点上,它是主控节点,负责分配代码、布置任务及检测故障 nimbus会去zookeeper上读取整个集群的信息,将任务交给supervisor,它是工作节点,负责创建、执行任务 supervisor创建worker进程,每个worker对应一个topology的子集。worker是task的容器,task是真正的任务执行者。
流程大致如下:
nimbus和supervisor都是节点(服务器),storm用zookeeper去管理supervisor节点的信息。
supervisor节点下会创建worker进程,创建多少个worker进程由conf配置文件决定。线程executor,由进程产生,用于执行任务,executor线程数有多少个是在setbolt、setspout的时候决定。task是真正的任务执行者,task其实就是包装了bolt/spout实例。
关于worker、executor、task之间的关系,在官网有一个例子专门说明了,我们可以看看。先放出代码:
内部的图:
解释一下:
默认情况下:如果不指定tasks数,那么一个线程会有一个task conf.setnumworkers(2)
代表会创建两个worker进程setspout("blue-spout", new bluespout(), 2)
蓝色spout会有两个线程处理,因为有两个进程,所以一个进程会有一个蓝色spout线程topologybuilder.setbolt("green-bolt", new greenbolt(), 2).setnumtasks(4)
绿色bolt会有两个线程处理,因为有两个进程worker所以一个进程会有一个绿色bolt线程。又因为设置了4个task数,所以一个线程会分配两个绿色的tasktopologybuilder.setbolt("yellow-bolt", new yellowbolt(), 6).shufflegrouping("green-bolt")
。黄色bolt会有6个线程处理,因为创建了两个进程,所以一个进程会有3个黄色bolt线程。没有单独设置task书,所以一个线程默认有一个task
从上面我们可以知道threads ≤ tasks
线程数是肯定小于等于task数的。有没有好奇宝宝会问:「storm用了线程,那么会有线程不安全的情况吗?」(其实这是三歪刚学的疑问)
一般来说不会,因为很多情况下,一个线程是对应一个task的(task你可以理解为bolt/spout的实例),既然每个线程是处理自己的实例了,那当然不会有线程安全的问题啦。(当然了,你如果在bolt/spout中设置了静态成员变量,那还是会有线程安全问题)
最后
这篇文章简单地介绍了一下storm,storm的东西其实还有很多,包括ack机制什么的。现在进官方找文档,都在主推trident了,有兴趣的同学可以继续往下看。
话又说回来,我司也在主推flink了,这块后续如果有迁移计划,我也准备学学搞搞,到时候再来分享分享入门文章。
参考资料:
各类知识点总结
下面的文章都有对应的原创精美pdf,在持续更新中,可以来找我催更~
92页的mybatis 129页的多线程 141页的servlet 158页的jsp 76页的集合 64页的jdbc 105页的数据结构和算法 142页的spring 58页的过滤器和监听器 30页的http hibernate ajax redis ......
涵盖java后端所有知识点的开源项目(已有7 k star):https://github.com/zhongfucheng3y/3y
如果大家想要实时关注我更新的文章以及分享的干货的话,微信搜索java3y。
pdf文档的内容均为手打,有任何的不懂都可以直接来问我(公众号有我的联系方式)。
本文使用 排版