基于Storm的WordCount
storm wordcount 工作过程
storm 版本:
1、spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、splitbolt 接收 spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、wordcountbolt 接收 splitbolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。
java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。
hadoop 版本:
1、按行读取文件中的数据;
2、在 mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 mapper()中输出的数据数组,在 reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。
源代码
storm的配置、eclipse里maven的配置以及创建项目部分省略。
mainclass
package com.test.stormwordcount; import backtype.storm.config; import backtype.storm.localcluster; import backtype.storm.stormsubmitter; import backtype.storm.generated.alreadyaliveexception; import backtype.storm.generated.invalidtopologyexception; import backtype.storm.topology.topologybuilder; import backtype.storm.tuple.fields; public class mainclass { public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception { //创建一个 topologybuilder topologybuilder tb = new topologybuilder(); tb.setspout("spoutbolt", new spoutbolt(), 2); tb.setbolt("splitbolt", new splitbolt(), 2).shufflegrouping("spoutbolt"); tb.setbolt("countbolt", new countbolt(), 4).fieldsgrouping("splitbolt", new fields("word")); //创建配置 config conf = new config(); //设置 worker 数量 conf.setnumworkers(2); //提交任务 //集群提交 //stormsubmitter.submittopology("mywordcount", conf, tb.createtopology()); //本地提交 localcluster localcluster = new localcluster(); localcluster.submittopology("mywordcount", conf, tb.createtopology()); } }
splitbolt 部分
package com.test.stormwordcount; import java.util.map; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.topology.base.baserichbolt; import backtype.storm.tuple.fields; import backtype.storm.tuple.tuple; import backtype.storm.tuple.values; public class splitbolt extends baserichbolt{ outputcollector collector; /** * 初始化 */ public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } /** * 执行方法 */ public void execute(tuple input) { string line = input.getstring(0); string[] split = line.split(" "); for (string word : split) { collector.emit(new values(word)); } } /** * 输出 */ public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("word")); } }
countbolt 部分
package com.test.stormwordcount; import java.util.hashmap; import java.util.map; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.topology.base.baserichbolt; import backtype.storm.tuple.tuple; public class countbolt extends baserichbolt{ outputcollector collector; map<string, integer> map = new hashmap<string, integer>(); /** * 初始化 */ public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } /** * 执行方法 */ public void execute(tuple input) { string word = input.getstring(0); if(map.containskey(word)){ integer c = map.get(word); map.put(word, c+1); }else{ map.put(word, 1); } //测试输出 system.out.println("结果:"+map); } /** * 输出 */ public void declareoutputfields(outputfieldsdeclarer declarer) { } }
spoutbolt 部分
package com.test.stormwordcount; import java.util.map; import backtype.storm.spout.spoutoutputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.topology.base.baserichspout; import backtype.storm.tuple.fields; import backtype.storm.tuple.values; public class spoutbolt extends baserichspout{ spoutoutputcollector collector; /** * 初始化方法 */ public void open(map map, topologycontext context, spoutoutputcollector collector) { this.collector = collector; } /** * 重复调用方法 */ public void nexttuple() { collector.emit(new values("hello world this is a test")); } /** * 输出 */ public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("test")); } }
pom.xml 文件内容
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.test</groupid> <artifactid>stormwordcount</artifactid> <version>0.9.6</version> <packaging>jar</packaging> <name>stormwordcount</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> </properties> <dependencies> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupid>org.apache.storm</groupid> <artifactid>storm-core</artifactid> <version>0.9.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactid>maven-assembly-plugin</artifactid> <configuration> <descriptorrefs> <descriptorref>jar-with-dependencies</descriptorref> </descriptorrefs> <archive> <manifest> <mainclass>com.test.stormwordcount.mainclass</mainclass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build>
遇到的问题
基于storm的wordcount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为eclipse ide for eclipse committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,help->install new software
点击add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接: 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中contack all update sites during install to find required software(耗时太久)。
但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的javaee ide了...
后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 windows->preferencs->maven->installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。
最后运行成功的结果: