欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

流数据处理与分析

程序员文章站 2022-07-13 14:50:39
...

环境

名称    版本
系统    Ubuntu 18.04.4 LTS
内存    7.5GiB
处理器    Intel Core i7-8565U CPU @ 1.80GHz *8
图形    Intel UHD Graphics(Whiskey Lake 3*8 GT2)
GNOME    3.28.2
操作系统类型    64位
磁盘    251.0 GB
Storm    2.1.0
Zookeeper    3.5.8
IntelliJ IDEA    COMMUNITY 2020.1
Maven    3.6.0
 

步骤

①安装zookeeper。
Zookeeper是起到一个协调者的作用,在分布式环境当中,zookeeper可以协调storm中各个节点之间的工作,它起到了不可或缺的重要作用。
首先下载编译好的安装包,名字为apache-zookeeper-3.5.8.tar.gz。然后将其解压缩到/usr/local目录下,并重命名为zookeeper。然后把conf目录下的zoo_sample.cfg复制一份,命名为zoo.cfg,这个是zookeeper的配置文件。然后对该配置文件进行编辑,编辑完成之后的效果如下图所示。
 

1 # The number of milliseconds of each tick
 2 tickTime=8000
 3 # The number of ticks that the initial 
 4 # synchronization phase can take
 5 initLimit=10
 6 # The number of ticks that can pass between 
 7 # sending a request and getting an acknowledgement
 8 syncLimit=5
 9 # the directory where the snapshot is stored.
10 # do not use /tmp for storage, /tmp here is just 
11 # example sakes.
12 dataDir=/usr/local/zookeeper/tmp
13 # the port at which the clients will connect
14 clientPort=2181
15 # the maximum number of client connections.
16 # increase this if you need to handle more clients
17 #maxClientCnxns=60
18 #
19 # Be sure to read the maintenance section of the 
20 # administrator guide before turning on autopurge.
21 #
22 # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
23 #
24 # The number of snapshots to retain in dataDir
25 #autopurge.snapRetainCount=3
26 # Purge task interval in hours
27 # Set to "0" to disable auto purge feature
28 #autopurge.purgeInterval=1
29 
30 admin.serverPort=8080
31 
32 zookeeper.client.sasl=false

②安装配置storm。
从官网把最新的storm安装包下载下来,下载下来的安装包的名字为:apache-storm-2.1.0.tar.gz。然后把它解压缩到/usr/local目录下,并重命名为storm。接着编辑storm的配置文件。首先编辑storm.yaml。这里配置2181端口,因为zookeeper为客户端开放了2181端口。还定义了数据存放路径:/usr/local/storm/data/storm。设置的ui端口为8081。这个可以通过storm ui命令,之后再浏览器访问8081端口可以看到ui界面。
 

storm.zookeeper.servers:
    - "localhost"

nimbus.seeds: ["localhost"]

storm.local.dir: "/usr/local/storm/data/storm"

nimbus.host: "localhost"

storm.zookeeper.port: 2181

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

ui.port: 8081

storm.zookeeper.session.timeout: 120000

storm.zookeeper.connection.timeout: 90000

然后配置storm-env.sh文件,在其中加入行,即配合jdk的路径。

export JAVA_HOME=${JAVA_HOME}

为了方便我们使用命令,不妨在~/.bashrc目录下添加两行配置:

export PATH=/usr/local/zookeeper/bin:$PATH
export PATH=/usr/local/storm/bin:$PATH

③安装maven
在ubuntu中运行命令:sudo apt-get install maven可以完成maven的安装。安装完成之后,可以在/usr/share目录下找到maven文件夹,这个就是maven的安装目录。然后对把settings.xml文件复制到/home/acat/.m2目录下再进行编辑。其中localRepository代表本地仓库的存放目录。从mirror标签中可以看出,为了加快从maven下载jar的速度,不妨使用阿里云镜像。
 

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<localRepository>/home/acat/.m2/repository</localRepository>
  <pluginGroups>
  </pluginGroups>

  <proxies>
  </proxies>
  <servers>
  </servers>

  <mirrors>
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>*,!jeecg,!jeecg-snapshots</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>
  <profiles>
  </profiles>
</settings>

④测试storm环境是否搭建成功。
首先查看当前运行的有哪些java进程。
 

aaa@qq.com:~$ jps
9117 Jps

然后开启zookeeper。

aaa@qq.com:~$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

接着开启supervisor节点。

aaa@qq.com:~$ storm supervisor
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf -Xmx256m -Djava.deserialization.disabled=true -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.supervisor.Supervisor

然后开启nimbus节点。

aaa@qq.com:~$ storm nimbus
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf -Xmx1024m -Djava.deserialization.disabled=true -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.nimbus.Nimbus

接着开启ui进程。

aaa@qq.com:~$ storm ui
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/lib-webapp/*:/usr/local/storm/conf -Xmx768m -Djava.deserialization.disabled=true -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/storm/log4j2/cluster.xml org.apache.storm.daemon.ui.UIServer

然后查看当前有哪些java进程。

aaa@qq.com:~$ jps
9154 QuorumPeerMain
9450 Nimbus
10187 Jps
9771 UIServer
9196 Supervisor
aaa@qq.com:~$

可以看出,QuorumPeerMain代表zookeeper进程,其他的如Nimbus,UIServer,Supervisor都存在相关的进程。然后通过浏览器访问http://localhost:8081,可以进行验证。

流数据处理与分析

⑤运行自带的WordCount实例。
首先接入到storm-starter这个项目的目录下。我们首先检查已经安装完成了的maven的版本。
 

aaa@qq.com:~$ mvn --version
Apache Maven 3.6.0
Maven home: /usr/share/maven
Java version: 1.8.0_161, vendor: Oracle Corporation, runtime: /home/acat/softwares/jdk1.8.0_161/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "5.3.0-59-generic", arch: "amd64", family: "unix"

然后使用mvn package命令进行打包。

aaa@qq.com:storm-starter$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.apache.storm:storm-starter >-------------------
[INFO] Building storm-starter 2.1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ storm-starter ---
[INFO] Deleting /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (cleanup) @ storm-starter ---
[INFO]
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ storm-starter ---
[INFO]
[INFO] --- maven-checkstyle-plugin:3.0.0:check (validate) @ storm-starter ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ storm-starter ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ storm-starter ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 5 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ storm-starter ---
[INFO] Changes detected - recompiling the module!
…此处省略若干行...
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target/storm-starter-2.1.0.jar with /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/target/storm-starter-2.1.0-shaded.jar
[INFO] Dependency-reduced POM written at: /home/acat/下载/apache-storm-2.1.0-src/apache-storm-2.1.0/storm-starter/dependency-reduced-pom.xml
[INFO]
[INFO] --- maven-site-plugin:3.7.1:attach-descriptor (attach-descriptor) @ storm-starter ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  47.478 s
[INFO] Finished at: 2020-06-24T15:32:34+08:00
[INFO] ------------------------------------------------------------------------

运行自带的word count实例。

aaa@qq.com:storm-starter$ storm kill word-count -w 2
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf:/usr/local/storm/bin org.apache.storm.command.KillTopology word-count -w 2
15:43:34.444 [main] WARN  o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:43:34.489 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
15:43:34.502 [main] INFO  o.a.s.c.KillTopology - Killed topology: word-count
aaa@qq.com:storm-starter$ storm list
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:/usr/local/storm/extlib-daemon/*:/usr/local/storm/conf:/usr/local/storm/bin org.apache.storm.command.ListTopologies
15:43:40.588 [main] WARN  o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:43:40.672 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
No topologies running.
aaa@qq.com:storm-starter$ cd target/
aaa@qq.com:target$ ls
checkstyle-cachefile    checkstyle-violation.xml  generated-sources       maven-archiver                  maven-status                      storm-starter-2.1.0.jar  test-classes
checkstyle-checker.xml  classes                   generated-test-sources  maven-shared-archive-resources  original-storm-starter-2.1.0.jar  surefire-reports         test-reports
aaa@qq.com:target$ storm jar storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology
Running: /home/acat/softwares/jdk1.8.0_161/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64 -Dstorm.conf.file= -cp /usr/local/storm/*:/usr/local/storm/lib/*:/usr/local/storm/extlib/*:storm-starter-2.1.0.jar:/usr/local/storm/conf:/usr/local/storm/bin: -Dstorm.jar=storm-starter-2.1.0.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} org.apache.storm.starter.WordCountTopology
15:44:58.687 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5591305942953417675:-7620622579961152904
15:44:58.741 [main] WARN  o.a.s.v.ConfigValidation - task.heartbeat.frequency.secs is a deprecated config please see class org.apache.storm.Config.TASK_HEARTBEAT_FREQUENCY_SECS for more information.
15:44:58.807 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : acat-xx:6627
15:44:58.808 [main] INFO  o.a.s.s.a.ClientAuthUtils - Got AutoCreds []
15:44:58.831 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - jars...
15:44:58.831 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - artifacts...
15:44:58.832 [main] INFO  o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : []
15:44:58.837 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar storm-starter-2.1.0.jar to assigned location: /usr/local/storm/data/storm/nimbus/inbox/stormjar-a737c857-311a-4916-9072-0873dbe78f5d.jar
15:44:59.355 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/data/storm/nimbus/inbox/stormjar-a737c857-311a-4916-9072-0873dbe78f5d.jar
15:44:59.355 [main] INFO  o.a.s.StormSubmitter - Submitting topology word-count in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5591305942953417675:-7620622579961152904","topology.workers":3,"topology.debug":true}
15:44:59.730 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: word-count
aaa@qq.com:target$

然后访问http://localhost:8081端口进行查看。可以看出,在ToPology Summary中多出了word-count这个实例。

流数据处理与分析

⑥搭建Storm开发环境(IDEA + maven)
如下图所示,使用的是IDEA的community版本,不需要注册就可以使用。
流数据处理与分析

maven的安装和配置上文已经讲过,这里不再赘述。
IDEA的maven相关的配置。这里选择自己安装在/usr/share/maven目录下的maven。Settings.xml文件设置为/home/acat/.m2/settings.xml,本地仓库目录设置为/home/acat/.m2/repository。
流数据处理与分析

此时,环境就基本搭建完成了。
⑦编程实现基于Storm的频繁关键词发现。
因为我们处理的是流数据,那么我们不得不思考一个问题:流数据从何而来呢?所以首先需要定义SentenceSpout类,该类继承了BaseRichSpout类。从而能够时间字符串的一次又一次的发送。注意,这里每次发送的都是一个字符串。而我们想要的到的是每个单词。那么怎么办呢?所以我们接下来定义一个SplitBolt类来完成把sentence分割成一个个的word然后发送这样的功能。从前面的分析可知,SplitBolt已经实现了发送一个个的单词,那么接下来可想而知就是对每个单词进行计数了。所以这时我们定义一个WordCountBolt类,在该类中定义了一个结构体(HashMap),其中key存放的是单词,Value存放的是单词出现的个数。到现在,单词的计数完成了,可以用户怎样才能够从电脑屏幕中看出各个单词出现的频率呢?所以,下一步就是把key,value从内存中搬到屏幕中。所以,这时定义一个ReportBolt类。使用它来完成内容的展示,在其中还可以添加一些处理逻辑,完成单词频率排序的功能。最后,各个组件我们已经定义好了,所以最后就可以定义一个主类,在其中定义main函数。在main函数中将各个组件绑定在一起,也就是说,把水管的各个组件拼接在一起,从而实现流数据的传送和关键词频率的分析。程序代码见第六章的代码部分。
编码部分解决了之后,就可以通过maven工具来打jar包了。本项目的pom.xml的内容如下:
 

 1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3     <modelVersion>4.0.0</modelVersion>
  4     <parent>
  5         <artifactId>storm</artifactId>
  6         <groupId>org.apache.storm</groupId>
  7         <version>2.1.0</version>
  8 <!--        <relativePath>../../pom.xml</relativePath>-->
  9     </parent>
 10     <artifactId>my_storm_test</artifactId>
 11     <packaging>jar</packaging>
 12     <name>my_storm_test</name>
 13     <properties>
 14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 15         <hbase.version>0.98.4-hadoop2</hbase.version>
 16     </properties>
 17     <dependencies>
 18         <dependency>
 19             <groupId>org.hdrhistogram</groupId>
 20             <artifactId>HdrHistogram</artifactId>
 21         </dependency>
 22         <dependency>
 23             <groupId>org.testng</groupId>
 24             <artifactId>testng</artifactId>
 25             <version>6.8.5</version>
 26             <scope>test</scope>
 27         </dependency>
 28         <dependency>
 29             <groupId>org.mockito</groupId>
 30             <artifactId>mockito-core</artifactId>
 31         </dependency>
 32         <dependency>
 33             <groupId>org.hamcrest</groupId>
 34             <artifactId>java-hamcrest</artifactId>
 35         </dependency>
 36         <dependency>
 37             <groupId>org.easytesting</groupId>
 38             <artifactId>fest-assert-core</artifactId>
 39             <version>2.0M8</version>
 40             <scope>test</scope>
 41         </dependency>
 42         <dependency>
 43             <groupId>org.jmock</groupId>
 44             <artifactId>jmock</artifactId>
 45             <version>2.6.0</version>
 46             <scope>test</scope>
 47         </dependency>
 48         <dependency>
 49             <groupId>org.apache.storm</groupId>
 50             <artifactId>storm-clojure</artifactId>
 51             <version>${project.version}</version>
 52         </dependency>
 53         <dependency>
 54             <groupId>org.apache.storm</groupId>
 55             <artifactId>storm-clojure-test</artifactId>
 56             <version>${project.version}</version>
 57             <scope>test</scope>
 58         </dependency>
 59         <dependency>
 60             <groupId>org.apache.storm</groupId>
 61             <artifactId>storm-client</artifactId>
 62             <version>${project.version}</version>
 63         </dependency>
 64         <dependency>
 65             <groupId>org.apache.storm</groupId>
 66             <artifactId>storm-client</artifactId>
 67             <version>${project.version}</version>
 68             <type>test-jar</type>
 69             <scope>test</scope>
 70         </dependency>
 71         <dependency>
 72             <groupId>org.apache.storm</groupId>
 73             <artifactId>multilang-javascript</artifactId>
 74             <version>${project.version}</version>
 75         </dependency>
 76         <dependency>
 77             <groupId>org.apache.storm</groupId>
 78             <artifactId>multilang-ruby</artifactId>
 79             <version>${project.version}</version>
 80         </dependency>
 81         <dependency>
 82             <groupId>org.apache.storm</groupId>
 83             <artifactId>multilang-python</artifactId>
 84             <version>${project.version}</version>
 85         </dependency>
 86         <dependency>
 87             <groupId>commons-collections</groupId>
 88             <artifactId>commons-collections</artifactId>
 89         </dependency>
 90         <dependency>
 91             <groupId>com.google.guava</groupId>
 92             <artifactId>guava</artifactId>
 93         </dependency>
 94         <dependency>
 95             <groupId>org.apache.storm</groupId>
 96             <artifactId>storm-metrics</artifactId>
 97             <version>${project.version}</version>
 98         </dependency>
 99         <dependency>
100             <groupId>org.apache.storm</groupId>
101             <artifactId>storm-hbase</artifactId>
102             <version>${project.version}</version>
103         </dependency>
104         <dependency>
105             <groupId>org.apache.storm</groupId>
106             <artifactId>storm-redis</artifactId>
107             <version>${project.version}</version>
108         </dependency>
109         <dependency>
110             <groupId>org.clojure</groupId>
111             <artifactId>clojure</artifactId>
112             <version>1.5.0</version>
113         </dependency>
114         <dependency>
115             <groupId>org.apache.storm</groupId>
116             <artifactId>storm-server</artifactId>
117             <version>2.1.0</version>
118             <scope>compile</scope>
119         </dependency>
120     </dependencies>
121     <build>
122         <sourceDirectory>src/main/java</sourceDirectory>
123     </build>
124 </project>

然后开始使用maven打包。

流数据处理与分析

Build成功之后,如下图所示。

流数据处理与分析

在/home/acat/IdeaProjects/my_storm_test/target目录下发现生成了jar包。

流数据处理与分析

然后使用storm进行测试。

流数据处理与分析

可以看出,关键词已经按照出现的频率进行排序了。

流数据处理与分析

代码

SentenceSpout.java

1 package com.test;
 2 
 3 import org.apache.storm.spout.SpoutOutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.OutputFieldsDeclarer;
 6 import org.apache.storm.topology.base.BaseRichSpout;
 7 import org.apache.storm.tuple.Fields;
 8 import org.apache.storm.tuple.Values;
 9 import org.apache.storm.utils.Utils;
10 
11 import java.util.Map;
12 
13 public class SentenceSpout extends BaseRichSpout {
14     private SpoutOutputCollector collector;
15     private int index = 0;
16     private String[] sentences = {
17             "my dog has fleas",
18             "a bird fly to the sky",
19             "my bird has fleas"
20     };
21 
22     public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
23         this.collector = spoutOutputCollector;
24     }
25 
26     @Override
27     public void nextTuple() {
28         if(index >= sentences.length){
29             return;
30         }
31         //发送字符串
32         this.collector.emit(new Values(sentences[index]));
33         index++;
34         Utils.sleep(1);
35     }
36 
37     @Override
38     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
39         outputFieldsDeclarer.declare(new Fields("sentence"));
40     }
41 }

SplitBolt.java

1 package com.test;
 2 
 3 import org.apache.storm.task.OutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.BasicOutputCollector;
 6 import org.apache.storm.topology.OutputFieldsDeclarer;
 7 import org.apache.storm.topology.base.BaseBasicBolt;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 import java.util.Map;
14 
15 @SuppressWarnings("serial")
16 public class SplitBolt extends BaseBasicBolt {
17 
18 
19     @Override
20     public void declareOutputFields(OutputFieldsDeclarer declarer) {
21         //定义了传到下一个bolt的字段描述
22         declarer.declare(new Fields("word"));
23     }
24 
25     @Override
26     public void execute(Tuple input, BasicOutputCollector collector) {
27         String sentence = input.getStringByField("sentence");
28         String[] words = sentence.split(" ");
29         for (String word : words) {
30             //发送单词
31             collector.emit(new Values(word));
32         }
33     }
34 }

WordCountBolt.java

1 package com.test;
 2 
 3 import org.apache.storm.task.OutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.OutputFieldsDeclarer;
 6 import org.apache.storm.topology.base.BaseRichBolt;
 7 import org.apache.storm.tuple.Fields;
 8 import org.apache.storm.tuple.Tuple;
 9 import org.apache.storm.tuple.Values;
10 
11 import java.util.HashMap;
12 import java.util.Map;
13 
14 
15 public class WordCountBolt extends BaseRichBolt {
16 
17     private OutputCollector collector;
18     private HashMap<String,Long> counts = null;
19 
20 
21     @Override
22     public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
23         this.collector = outputCollector;
24         this.counts = new HashMap<String,Long>();
25     }
26 
27     @Override
28     public void execute(Tuple tuple) {
29         String word = tuple.getStringByField("word");
30         System.out.println("--------------"+ word +"--------------------------");
31         Long count = this.counts.get(word);
32         if(count == null) count = 0L;
33         count++;
34         this.counts.put(word,count);
35         this.collector.emit(new Values(word,count));
36     }
37 
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
40         outputFieldsDeclarer.declare(new Fields("word","count"));
41     }
42 }

ReportBolt.java

1 package com.test;
 2 
 3 import org.apache.storm.task.OutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.OutputFieldsDeclarer;
 6 import org.apache.storm.topology.base.BaseRichBolt;
 7 import org.apache.storm.tuple.Tuple;
 8 
 9 
10 import java.util.*;
11 
12 @SuppressWarnings("serial")
13 public class ReportBolt extends BaseRichBolt {
14     private HashMap<String,Long> counts = null;
15 
16 
17     @Override
18     public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
19         this.counts = new HashMap<String,Long>();
20     }
21 
22     @Override
23     public void execute(Tuple tuple) {
24         String word = tuple.getStringByField("word");
25         Long count = tuple.getLongByField("count");
26         this.counts.put(word,count);
27     }
28 
29     @Override
30     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31 
32     }
33 
34     @Override
35     public void cleanup(){
36         //实现关键词出现次数的升序排序
37         System.out.println("--- FINAL COUNT ---");
38         Map<String, Long>myMap = (Map<String,Long>)this.counts;
39         List<Map.Entry<String,Long>> list = new ArrayList<Map.Entry<String, Long>>(myMap.entrySet());
40         Collections.sort(list, new Comparator<Map.Entry<String, Long>>() {
41             @Override
42             public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
43                 return o1.getValue().compareTo(o2.getValue());
44             }
45         });
46         for(Map.Entry<String,Long> mapping:list){
47             System.out.println(mapping.getKey() + " : " + mapping.getValue());
48         }
49 //        List<String> keys = new ArrayList<String>();
50 //        keys.addAll(this.counts.keySet());
51 //
52 //        for(String key:keys){
53 //            System.out.println(key + " : " + this.counts.get(key));
54 //        }
55         System.out.println("-------------------------------------");
56     }
57 }

MyTopology.java

1 package com.test;
 2 import org.apache.storm.Config;
 3 import org.apache.storm.LocalCluster;
 4 import org.apache.storm.StormSubmitter;
 5 import org.apache.storm.generated.AlreadyAliveException;
 6 import org.apache.storm.generated.AuthorizationException;
 7 import org.apache.storm.generated.InvalidTopologyException;
 8 import org.apache.storm.topology.TopologyBuilder;
 9 import org.apache.storm.tuple.Fields;
10 public class MyTopology {
11     public static void main(String[] args) throws Exception {
12         TopologyBuilder builder = new TopologyBuilder();
13         builder.setSpout("sentence-spout",new SentenceSpout(),1);
14         builder.setBolt("split-bolt",new SplitBolt(),2).shuffleGrouping("sentence-spout");
15         builder.setBolt("count-bolt",new WordCountBolt()).fieldsGrouping("split-bolt",new Fields("word"));
16         builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt");
17         Config conf = new Config();
18         conf.setDebug(false);
19 
20         if (args != null && args.length > 0) {
21             // 集群模式
22             conf.setNumWorkers(1);
23             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
24         } else {
25             // 本地模式
26             LocalCluster cluster = new LocalCluster();
27             cluster.submitTopology("word-count-test", conf, builder.createTopology());
28             Thread.sleep(5000);
29             cluster.shutdown();
30         }
31     }
32 }

 

相关标签: ubantu