Storm框架整合springboot的方法
storm:最火的流式处理框架
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦。
•实现一个实时计算系统
全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。
1.低延迟。都说了是实时计算系统了,延迟是一定要低的。
2.高性能。性能不高就是浪费机器,浪费机器是要受批评的哦。
3.分布式。系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。
4.可扩展。伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。
5.容错。这是分布式系统中通用问题。一个节点挂了不能影响我的应用。
好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。我们再继续往下看。
1.容易在上面开发应用程序。亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。
2.消息不丢失。用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。这个要求有点高哦。
诞 生
在2011年storm开源之前,由于hadoop的火红,整个业界都在喋喋不休地谈论大数据。hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。
有需求也就有创造,在hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补hadoop的实时性为目标而被创造出来。而在这个节骨眼上storm横空出世了。
storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点:
•分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。
•运维简单:storm的部署的确简单。虽然没有mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。
•高度容错:模块都是无状态的,随时宕机重启。
•无数据丢失:storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。
•多语言:实际上,storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用java实现。
下面介绍下storm框架整合springboot的方法
我们知道storm本身是一个独立运行的分布式流式数据处理框架,springboot
也是一个独立运行的web框架。那么如何在strom框架中集成springboot
使得我们能够在storm开发中运用spring的ioc容器及其他如spring jpa
等功能呢?我们先来了解以下概念:
•storm主要的三个component:topology、spout、bolt。topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
•springapplication:是配置spring应用上下文的起点。通过调用springapplication.run()方法它将创建applicationcontext实例,这是我们能够使用ioc容器的主要beanfactory。之后spring将会加载所有单例模式的beans,并启动后台运行的commandlinerunner beans等。
•applicationcontextaware:这是我们能够在普通java类中调用spring容器里的beans的关键接口。
实现原理
storm框架中的每个spout和bolt都相当于独立的应用,strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到spring的applicationcontext,有了applicationcontext实例对象,spring的所有功能就都能用上了。
•spout.open方法实现
@override public void open(map map, topologycontext topologycontext, spoutoutputcollector spoutoutputcollector) { //启动springboot应用 springstormapplication.run(); this.map = map; this.topologycontext = topologycontext; this.spoutoutputcollector = spoutoutputcollector; }
•bolt.prepare方法实现
@override public void prepare(map map, topologycontext topologycontext, outputcollector outputcollector) { //启动springboot应用 springstormapplication.run(); this.map = map; this.topologycontext = topologycontext; this.outputcollector = outputcollector; } •springstormapplication启动类 @springbootapplication @componentscan(value = "com.xxx.storm") public class springstormapplication { /** * 非工程启动入口,所以不用main方法 * @param args */ public static void run(string ...args) { springapplication app = new springapplication(springstormapplication.class); //我们并不需要web servlet功能,所以设置为webapplicationtype.none app.setwebapplicationtype(webapplicationtype.none); //忽略掉banner输出 app.setbannermode(banner.mode.off); //忽略spring启动信息日志 app.setlogstartupinfo(false); app.run(args); } }
与我们传统的springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:
@springbootapplication @componentscan(value = "com.xxx.web") public class platformapplication { public static void main(string[] args) { springapplication.run(platformapplication.class, args); } }
•在spout/bolt中调用了springstormapplication.run方法后,我们还需要能够拿到applicationcontext容器对象,这时候我们还需要实现applicationcontextaware接口,写个工具类beanutils:
@component public class beanutils implements applicationcontextaware { private static applicationcontext applicationcontext = null; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { if (beanutils.applicationcontext == null) { beanutils.applicationcontext = applicationcontext; } } public static applicationcontext getapplicationcontext() { return applicationcontext; } public static object getbean(string name) { return getapplicationcontext().getbean(name); } public static <t> t getbean(class<t> clazz) { return getapplicationcontext().getbean(clazz); } public static <t> t getbean(string name, class<t> clazz) { return getapplicationcontext().getbean(name, clazz); } }
通过@component
注解使得spring在启动时能够扫描到该bean,因为beanutils实现了applicationcontextaware
接口,spring会在启动成功时自动调用beanutils.setapplicationcontext
方法,将applicationcontext
对象保存到工具类的静态变量中,之后我们就可以使用beanutils.getbean()
去获取spring容器中的bean了。
写个简单例子
•在filterbolt的execute方法中获取spring bean
@override public void execute(tuple tuple) { filterservice filterservice = (filterservice) beanutils.getbean("filterservice"); filterservice.deleteall(); }
•定义filterservice类,这时候我们就可以使用spring的相关注解,自动注入,spring jpa等功能了。
@service("filterservice") public class filterservice { @autowired userrepository userrepository; public void deleteall() { userrepository.deleteall(); } }
将storm应用作为springboot工程的一个子模块
工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动strom的topology应用会有一个日志包依赖冲突。
slf4j: class path contains multiple slf4j bindings. slf4j: found binding in [jar:file:/applications/intellij%20idea.app/contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: found binding in [jar:file:/applications/intellij%20idea.app/contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation. slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory]
我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将springboot的相关日志包排除掉,如下:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> <exclusions> <exclusion> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-to-slf4j2</artifactid> </exclusion> <exclusion> <groupid>ch.qos.logback</groupid> <artifactid>logback-classic2</artifactid> </exclusion> </exclusions> </dependency>
总结
以上所述是小编给大家介绍的storm框架整合springboot的方法,希望对大家有所帮助
推荐阅读