流数据处理与分析
环境
名称 版本
系统 Ubuntu 18.04.4 LTS
内存 7.5GiB
处理器 Intel Core i7-8565U CPU @ 1.80GHz *8
图形 Intel UHD Graphics(Whiskey Lake 3*8 GT2)
GNOME 3.28.2
操作系统类型 64位
磁盘 251.0 GB
Storm 2.1.0
Zookeeper 3.5.8
IntelliJ IDEA COMMUNITY 2020.1
Maven 3.6.0
步骤
①安装zookeeper。
Zookeeper是起到一个协调者的作用,在分布式环境当中,zookeeper可以协调storm中各个节点之间的工作,它起到了不可或缺的重要作用。
首先下载编译好的安装包,名字为apache-zookeeper-3.5.8.tar.gz。然后将其解压缩到/usr/local目录下,并重命名为zookeeper。然后把conf目录下的zoo_sample.cfg复制一份,命名为zoo.cfg,这个是zookeeper的配置文件。然后对该配置文件进行编辑,编辑完成之后的效果如下图所示。
1 # The number of milliseconds of each tick
2 tickTime=8000
3 # The number of ticks that the initial
4 # synchronization phase can take
5 initLimit=10
6 # The number of ticks that can pass between
7 # sending a request and getting an acknowledgement
8 syncLimit=5
9 # the directory where the snapshot is stored.
10 # do not use /tmp for storage, /tmp here is just
11 # example sakes.
12 dataDir=/usr/local/zookeeper/tmp
13 # the port at which the clients will connect
14 clientPort=2181
15 # the maximum number of client connections.
16 # increase this if you need to handle more clients
17 #maxClientCnxns=60
18 #
19 # Be sure to read the maintenance section of the
20 # administrator guide before turning on autopurge.
21 #
22 # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
23 #
24 # The number of snapshots to retain in dataDir
25 #autopurge.snapRetainCount=3
26 # Purge task interval in hours
27 # Set to "0" to disable auto purge feature
28 #autopurge.purgeInterval=1
29
30 admin.serverPort=8080
31
32 zookeeper.client.sasl=false
②安装配置storm。
从官网把最新的storm安装包下载下来,下载下来的安装包的名字为:apache-storm-2.1.0.tar.gz。然后把它解压缩到/usr/local目录下,并重命名为storm。接着编辑storm的配置文件。首先编辑storm.yaml。这里配置2181端口,因为zookeeper为客户端开放了2181端口。还定义了数据存放路径:/usr/local/storm/data/storm。设置的ui端口为8081。这个可以通过storm ui命令,之后再浏览器访问8081端口可以看到ui界面。
storm.zookeeper.servers:
- "localhost"
nimbus.seeds: ["localhost"]
storm.local.dir: "/usr/local/storm/data/storm"
nimbus.host: "localhost"
storm.zookeeper.port: 2181
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
ui.port: 8081
storm.zookeeper.session.timeout: 120000
storm.zookeeper.connection.timeout: 90000
然后配置storm-env.sh文件,在其中加入行,即配合jdk的路径。
export JAVA_HOME=${JAVA_HOME}
为了方便我们使用命令,不妨在~/.bashrc目录下添加两行配置:
export PATH=/usr/local/zookeeper/bin:$PATH
export PATH=/usr/local/storm/bin:$PATH
③安装maven
在ubuntu中运行命令:sudo apt-get install maven可以完成maven的安装。安装完成之后,可以在/usr/share目录下找到maven文件夹,这个就是maven的安装目录。然后对把settings.xml文件复制到/home/acat/.m2目录下再进行编辑。其中localRepository代表本地仓库的存放目录。从mirror标签中可以看出,为了加快从maven下载jar的速度,不妨使用阿里云镜像。
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<localRepository>/home/acat/.m2/repository</localRepository>
<pluginGroups>
</pluginGroups>
<proxies>
</proxies>
<servers>
</servers>
<mirrors>
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*,!jeecg,!jeecg-snapshots</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
</mirrors>
<profiles>
</profiles>
</settings>
④测试storm环境是否搭建成功。
首先查看当前运行的有哪些java进程。
aaa@qq.com:~$ jps
9117 Jps
然后开启zookeeper。
aaa@qq.com:~$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
接着开启supervisor节点。
aaa@qq.com:~$ storm supervisor
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf -Xmx256m -Djava.deserialization.disabled=true -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.supervisor.Supervisor
然后开启nimbus节点。
aaa@qq.com:~$ storm nimbus
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf -Xmx1024m -Djava.deserialization.disabled=true -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.nimbus.Nimbus
接着开启ui进程。
aaa@qq.com:~$ storm ui
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/lib-webapp/*:/usr/local/storm/conf -Xmx768m -Djava.deserialization.disabled=true -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.ui.UIServer
然后查看当前有哪些java进程。
aaa@qq.com:~$ jps
9154 QuorumPeerMain
9450 Nimbus
10187 Jps
9771 UIServer
9196 Supervisor
aaa@qq.com:~$
可以看出,QuorumPeerMain代表zookeeper进程,其他的如Nimbus,UIServer,Supervisor都存在相关的进程。然后通过浏览器访问http://localhost:8081,可以进行验证。
⑤运行自带的WordCount实例。
首先接入到storm-starter这个项目的目录下。我们首先检查已经安装完成了的maven的版本。
aaa@qq.com:~$ mvn --version
Apache Maven 3.6.0
Maven home: /usr/share/maven
Java version: 1.8.0_161, vendor: Oracle Corporation, runtime: /home/acat/softwares/jdk1.8.0_161/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "5.3.0-59-generic", arch: "amd64", family: "unix"
然后使用mvn package命令进行打包。
aaa@qq.com:storm-starter$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.apache.storm:storm-starter >-------------------
[INFO] Building storm-starter 2.1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ storm-starter ---
[INFO] Deleting /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (cleanup) @ storm-starter ---
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ storm-starter ---
[INFO]
[INFO] --- maven-checkstyle-plugin:3.0.0:check (validate) @ storm-starter ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ storm-starter ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ storm-starter ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 5 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ storm-starter ---
[INFO] Changes detected - recompiling the module!
…此处省略若干行...
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target/storm-starter-2.1.0.jar with /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target/storm-starter-2.1.0-shaded.jar
[INFO] Dependency-reduced POM written at: /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/dependency-reduced-pom.xml
[INFO]
[INFO] --- maven-site-plugin:3.7.1:attach-descriptor (attach-descriptor) @ storm-starter ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 47.478 s
[INFO] Finished at: 2020-06-24T15:32:34+08:00
[INFO] ------------------------------------------------------------------------
运行自带的word count实例。
aaa@qq.com:storm-starter$ storm kill word-count -w 2
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf:/usr/local/storm/bin org.apache.storm.command.KillTopology word-count -w 2
15:43:34.444 [main] WARN o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:43:34.489 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
15:43:34.502 [main] INFO o.a.s.c.KillTopology - Killed topology: word-count
aaa@qq.com:storm-starter$ storm list
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf:/usr/local/storm/bin org.apache.storm.command.ListTopologies
15:43:40.588 [main] WARN o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:43:40.672 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
No topologies running.
aaa@qq.com:storm-starter$ cd target/
aaa@qq.com:target$ ls
checkstyle-cachefile checkstyle-violation.xml generated-sources maven-archiver maven-status storm-starter-2.1.0.jar test-classes
checkstyle-checker.xml classes generated-test-sources maven-shared-archive-resources original-storm-starter-2.1.0.jar surefire-reports test-reports
aaa@qq.com:target$ storm jar storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:storm-starter-2.1.0.jar:/usr/local/storm/conf:/usr/local/storm/bin: -Dstorm.jar=storm-starter-2.1.0.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} org.apache.storm.starter.WordCountTopology
15:44:58.687 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5591305942953417675:-7620622579961152904
15:44:58.741 [main] WARN o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:44:58.807 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
15:44:58.808 [main] INFO o.a.s.s.a.ClientAuthUtils - Got AutoCreds []
15:44:58.831 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - jars...
15:44:58.831 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - artifacts...
15:44:58.832 [main] INFO o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : []
15:44:58.837 [main] INFO o.a.s.StormSubmitter - Uploading topology jar storm-starter-2.1.0.jar to assigned location: /usr/local/storm/data/storm/nimbus/inbox/stormjar-a737c857-311a-4916-9072-0873dbe78f5d.jar
15:44:59.355 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/data/storm/nimbus/inbox/stormjar-a737c857-311a-4916-9072-0873dbe78f5d.jar
15:44:59.355 [main] INFO o.a.s.StormSubmitter - Submitting topology word-count in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5591305942953417675:-7620622579961152904","topology.workers":3,"topology.debug":true}
15:44:59.730 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: word-count
aaa@qq.com:target$
然后访问http://localhost:8081端口进行查看。可以看出,在ToPology Summary中多出了word-count这个实例。
⑥搭建Storm开发环境(IDEA + maven)
如下图所示,使用的是IDEA的community版本,不需要注册就可以使用。
maven的安装和配置上文已经讲过,这里不再赘述。
IDEA的maven相关的配置。这里选择自己安装在/usr/share/maven目录下的maven。Settings.xml文件设置为/home/acat/.m2/settings.xml,本地仓库目录设置为/home/acat/.m2/repository。
此时,环境就基本搭建完成了。
⑦编程实现基于Storm的频繁关键词发现。
因为我们处理的是流数据,那么我们不得不思考一个问题:流数据从何而来呢?所以首先需要定义SentenceSpout类,该类继承了BaseRichSpout类。从而能够时间字符串的一次又一次的发送。注意,这里每次发送的都是一个字符串。而我们想要的到的是每个单词。那么怎么办呢?所以我们接下来定义一个SplitBolt类来完成把sentence分割成一个个的word然后发送这样的功能。从前面的分析可知,SplitBolt已经实现了发送一个个的单词,那么接下来可想而知就是对每个单词进行计数了。所以这时我们定义一个WordCountBolt类,在该类中定义了一个结构体(HashMap),其中key存放的是单词,Value存放的是单词出现的个数。到现在,单词的计数完成了,可以用户怎样才能够从电脑屏幕中看出各个单词出现的频率呢?所以,下一步就是把key,value从内存中搬到屏幕中。所以,这时定义一个ReportBolt类。使用它来完成内容的展示,在其中还可以添加一些处理逻辑,完成单词频率排序的功能。最后,各个组件我们已经定义好了,所以最后就可以定义一个主类,在其中定义main函数。在main函数中将各个组件绑定在一起,也就是说,把水管的各个组件拼接在一起,从而实现流数据的传送和关键词频率的分析。程序代码见第六章的代码部分。
编码部分解决了之后,就可以通过maven工具来打jar包了。本项目的pom.xml的内容如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3 <modelVersion>4.0.0</modelVersion>
4 <parent>
5 <artifactId>storm</artifactId>
6 <groupId>org.apache.storm</groupId>
7 <version>2.1.0</version>
8 <!-- <relativePath>../../pom.xml</relativePath>-->
9 </parent>
10 <artifactId>my_storm_test</artifactId>
11 <packaging>jar</packaging>
12 <name>my_storm_test</name>
13 <properties>
14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15 <hbase.version>0.98.4-hadoop2</hbase.version>
16 </properties>
17 <dependencies>
18 <dependency>
19 <groupId>org.hdrhistogram</groupId>
20 <artifactId>HdrHistogram</artifactId>
21 </dependency>
22 <dependency>
23 <groupId>org.testng</groupId>
24 <artifactId>testng</artifactId>
25 <version>6.8.5</version>
26 <scope>test</scope>
27 </dependency>
28 <dependency>
29 <groupId>org.mockito</groupId>
30 <artifactId>mockito-core</artifactId>
31 </dependency>
32 <dependency>
33 <groupId>org.hamcrest</groupId>
34 <artifactId>java-hamcrest</artifactId>
35 </dependency>
36 <dependency>
37 <groupId>org.easytesting</groupId>
38 <artifactId>fest-assert-core</artifactId>
39 <version>2.0M8</version>
40 <scope>test</scope>
41 </dependency>
42 <dependency>
43 <groupId>org.jmock</groupId>
44 <artifactId>jmock</artifactId>
45 <version>2.6.0</version>
46 <scope>test</scope>
47 </dependency>
48 <dependency>
49 <groupId>org.apache.storm</groupId>
50 <artifactId>storm-clojure</artifactId>
51 <version>${project.version}</version>
52 </dependency>
53 <dependency>
54 <groupId>org.apache.storm</groupId>
55 <artifactId>storm-clojure-test</artifactId>
56 <version>${project.version}</version>
57 <scope>test</scope>
58 </dependency>
59 <dependency>
60 <groupId>org.apache.storm</groupId>
61 <artifactId>storm-client</artifactId>
62 <version>${project.version}</version>
63 </dependency>
64 <dependency>
65 <groupId>org.apache.storm</groupId>
66 <artifactId>storm-client</artifactId>
67 <version>${project.version}</version>
68 <type>test-jar</type>
69 <scope>test</scope>
70 </dependency>
71 <dependency>
72 <groupId>org.apache.storm</groupId>
73 <artifactId>multilang-javascript</artifactId>
74 <version>${project.version}</version>
75 </dependency>
76 <dependency>
77 <groupId>org.apache.storm</groupId>
78 <artifactId>multilang-ruby</artifactId>
79 <version>${project.version}</version>
80 </dependency>
81 <dependency>
82 <groupId>org.apache.storm</groupId>
83 <artifactId>multilang-python</artifactId>
84 <version>${project.version}</version>
85 </dependency>
86 <dependency>
87 <groupId>commons-collections</groupId>
88 <artifactId>commons-collections</artifactId>
89 </dependency>
90 <dependency>
91 <groupId>com.google.guava</groupId>
92 <artifactId>guava</artifactId>
93 </dependency>
94 <dependency>
95 <groupId>org.apache.storm</groupId>
96 <artifactId>storm-metrics</artifactId>
97 <version>${project.version}</version>
98 </dependency>
99 <dependency>
100 <groupId>org.apache.storm</groupId>
101 <artifactId>storm-hbase</artifactId>
102 <version>${project.version}</version>
103 </dependency>
104 <dependency>
105 <groupId>org.apache.storm</groupId>
106 <artifactId>storm-redis</artifactId>
107 <version>${project.version}</version>
108 </dependency>
109 <dependency>
110 <groupId>org.clojure</groupId>
111 <artifactId>clojure</artifactId>
112 <version>1.5.0</version>
113 </dependency>
114 <dependency>
115 <groupId>org.apache.storm</groupId>
116 <artifactId>storm-server</artifactId>
117 <version>2.1.0</version>
118 <scope>compile</scope>
119 </dependency>
120 </dependencies>
121 <build>
122 <sourceDirectory>src/main/java</sourceDirectory>
123 </build>
124 </project>
然后开始使用maven打包。
Build成功之后,如下图所示。
在/home/acat/IdeaProjects/my_storm_test/target目录下发现生成了jar包。
然后使用storm进行测试。
可以看出,关键词已经按照出现的频率进行排序了。
代码
SentenceSpout.java
1 package com.test;
2
3 import org.apache.storm.spout.SpoutOutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichSpout;
7 import org.apache.storm.tuple.Fields;
8 import org.apache.storm.tuple.Values;
9 import org.apache.storm.utils.Utils;
10
11 import java.util.Map;
12
13 public class SentenceSpout extends BaseRichSpout {
14 private SpoutOutputCollector collector;
15 private int index = 0;
16 private String[] sentences = {
17 "my dog has fleas",
18 "a bird fly to the sky",
19 "my bird has fleas"
20 };
21
22 public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
23 this.collector = spoutOutputCollector;
24 }
25
26 @Override
27 public void nextTuple() {
28 if(index >= sentences.length){
29 return;
30 }
31 //发送字符串
32 this.collector.emit(new Values(sentences[index]));
33 index++;
34 Utils.sleep(1);
35 }
36
37 @Override
38 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
39 outputFieldsDeclarer.declare(new Fields("sentence"));
40 }
41 }
SplitBolt.java
1 package com.test;
2
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.BasicOutputCollector;
6 import org.apache.storm.topology.OutputFieldsDeclarer;
7 import org.apache.storm.topology.base.BaseBasicBolt;
8 import org.apache.storm.topology.base.BaseRichBolt;
9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12
13 import java.util.Map;
14
15 @SuppressWarnings("serial")
16 public class SplitBolt extends BaseBasicBolt {
17
18
19 @Override
20 public void declareOutputFields(OutputFieldsDeclarer declarer) {
21 //定义了传到下一个bolt的字段描述
22 declarer.declare(new Fields("word"));
23 }
24
25 @Override
26 public void execute(Tuple input, BasicOutputCollector collector) {
27 String sentence = input.getStringByField("sentence");
28 String[] words = sentence.split(" ");
29 for (String word : words) {
30 //发送单词
31 collector.emit(new Values(word));
32 }
33 }
34 }
WordCountBolt.java
1 package com.test;
2
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Fields;
8 import org.apache.storm.tuple.Tuple;
9 import org.apache.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Map;
13
14
15 public class WordCountBolt extends BaseRichBolt {
16
17 private OutputCollector collector;
18 private HashMap<String,Long> counts = null;
19
20
21 @Override
22 public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
23 this.collector = outputCollector;
24 this.counts = new HashMap<String,Long>();
25 }
26
27 @Override
28 public void execute(Tuple tuple) {
29 String word = tuple.getStringByField("word");
30 System.out.println("--------------"+ word +"--------------------------");
31 Long count = this.counts.get(word);
32 if(count == null) count = 0L;
33 count++;
34 this.counts.put(word,count);
35 this.collector.emit(new Values(word,count));
36 }
37
38 @Override
39 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
40 outputFieldsDeclarer.declare(new Fields("word","count"));
41 }
42 }
ReportBolt.java
1 package com.test;
2
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Tuple;
8
9
10 import java.util.*;
11
12 @SuppressWarnings("serial")
13 public class ReportBolt extends BaseRichBolt {
14 private HashMap<String,Long> counts = null;
15
16
17 @Override
18 public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
19 this.counts = new HashMap<String,Long>();
20 }
21
22 @Override
23 public void execute(Tuple tuple) {
24 String word = tuple.getStringByField("word");
25 Long count = tuple.getLongByField("count");
26 this.counts.put(word,count);
27 }
28
29 @Override
30 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31
32 }
33
34 @Override
35 public void cleanup(){
36 //实现关键词出现次数的升序排序
37 System.out.println("--- FINAL COUNT ---");
38 Map<String, Long>myMap = (Map<String,Long>)this.counts;
39 List<Map.Entry<String,Long>> list = new ArrayList<Map.Entry<String, Long>>(myMap.entrySet());
40 Collections.sort(list, new Comparator<Map.Entry<String, Long>>() {
41 @Override
42 public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
43 return o1.getValue().compareTo(o2.getValue());
44 }
45 });
46 for(Map.Entry<String,Long> mapping:list){
47 System.out.println(mapping.getKey() + " : " + mapping.getValue());
48 }
49 // List<String> keys = new ArrayList<String>();
50 // keys.addAll(this.counts.keySet());
51 //
52 // for(String key:keys){
53 // System.out.println(key + " : " + this.counts.get(key));
54 // }
55 System.out.println("-------------------------------------");
56 }
57 }
MyTopology.java
1 package com.test;
2 import org.apache.storm.Config;
3 import org.apache.storm.LocalCluster;
4 import org.apache.storm.StormSubmitter;
5 import org.apache.storm.generated.AlreadyAliveException;
6 import org.apache.storm.generated.AuthorizationException;
7 import org.apache.storm.generated.InvalidTopologyException;
8 import org.apache.storm.topology.TopologyBuilder;
9 import org.apache.storm.tuple.Fields;
10 public class MyTopology {
11 public static void main(String[] args) throws Exception {
12 TopologyBuilder builder = new TopologyBuilder();
13 builder.setSpout("sentence-spout",new SentenceSpout(),1);
14 builder.setBolt("split-bolt",new SplitBolt(),2).shuffleGrouping("sentence-spout");
15 builder.setBolt("count-bolt",new WordCountBolt()).fieldsGrouping("split-bolt",new Fields("word"));
16 builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt");
17 Config conf = new Config();
18 conf.setDebug(false);
19
20 if (args != null && args.length > 0) {
21 // 集群模式
22 conf.setNumWorkers(1);
23 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
24 } else {
25 // 本地模式
26 LocalCluster cluster = new LocalCluster();
27 cluster.submitTopology("word-count-test", conf, builder.createTopology());
28 Thread.sleep(5000);
29 cluster.shutdown();
30 }
31 }
32 }