欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Storm的WordCount

程序员文章站 2022-04-10 14:09:03
Storm WordCount 工作过程 Storm 版本: 1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去; 2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去; 3、WordCountBolt 接收 SplitBolt ......

storm wordcount 工作过程

storm 版本:
1、spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、splitbolt 接收 spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、wordcountbolt 接收 splitbolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

hadoop 版本:
1、按行读取文件中的数据;
2、在 mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 mapper()中输出的数据数组,在 reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

mainclass

package com.test.stormwordcount;
import backtype.storm.config; 
import backtype.storm.localcluster; 
import backtype.storm.stormsubmitter; 
import backtype.storm.generated.alreadyaliveexception; 
import backtype.storm.generated.invalidtopologyexception; 
import backtype.storm.topology.topologybuilder; 
import backtype.storm.tuple.fields; 

public class mainclass { 

    public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception {         
        //创建一个 topologybuilder         
        topologybuilder tb = new topologybuilder();         
        tb.setspout("spoutbolt", new spoutbolt(), 2);         tb.setbolt("splitbolt", new splitbolt(), 2).shufflegrouping("spoutbolt");         
        tb.setbolt("countbolt", new countbolt(), 4).fieldsgrouping("splitbolt", new fields("word"));         
        //创建配置         
        config conf = new config();         
        //设置 worker 数量         
        conf.setnumworkers(2);         
        //提交任务         
        //集群提交         
        //stormsubmitter.submittopology("mywordcount", conf, tb.createtopology());         
        //本地提交         
        localcluster localcluster = new localcluster();         
        localcluster.submittopology("mywordcount", conf, tb.createtopology()); 
    }  
} 

splitbolt 部分

package com.test.stormwordcount;
import java.util.map; 
import backtype.storm.task.outputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichbolt; 
import backtype.storm.tuple.fields; 
import backtype.storm.tuple.tuple; 
import backtype.storm.tuple.values; 

public class splitbolt extends baserichbolt{      
    outputcollector collector; 

    /**      * 初始化      */     
    public void prepare(map stormconf, topologycontext context, outputcollector collector) {         
        this.collector = collector;     
        } 

    /**      * 执行方法      */     
    public void execute(tuple input) {         
        string line = input.getstring(0);         
        string[] split = line.split(" ");         
        for (string word : split) {             
            collector.emit(new values(word));         
            }     
        } 

    /**      * 输出      */     
    public void declareoutputfields(outputfieldsdeclarer declarer) {         
        declarer.declare(new fields("word"));     
        } 
} 

countbolt 部分

package com.test.stormwordcount;
import java.util.hashmap; 
import java.util.map; 
import backtype.storm.task.outputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichbolt; 
import backtype.storm.tuple.tuple; 

public class countbolt extends baserichbolt{ 

    outputcollector collector;
    map<string, integer> map = new hashmap<string, integer>(); 

    /**      * 初始化      */     
    public void prepare(map stormconf, topologycontext context, outputcollector collector) {         
        this.collector = collector;     
        } 


    /**      * 执行方法      */     
public void execute(tuple input) {         
    string word = input.getstring(0);         
    if(map.containskey(word)){             
    integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //测试输出         
    system.out.println("结果:"+map);     
    } 

    /**      * 输出      */     
public void declareoutputfields(outputfieldsdeclarer declarer) {     
    
} 
} 

spoutbolt 部分

package com.test.stormwordcount;
import java.util.map; 
import backtype.storm.spout.spoutoutputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichspout; 
import backtype.storm.tuple.fields; 
import backtype.storm.tuple.values; 

public class spoutbolt extends baserichspout{ 

    spoutoutputcollector collector;
    /**      * 初始化方法      */     
    public void open(map map, topologycontext context, spoutoutputcollector collector) {         
        this.collector = collector;     
        } 

    /**      * 重复调用方法      */     
    public void nexttuple() {         
        collector.emit(new values("hello world this is a test"));     
        } 

    /**      * 输出      */     
    public void declareoutputfields(outputfieldsdeclarer declarer) {         
        declarer.declare(new fields("test"));     
        } 
} 

pom.xml 文件内容

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>

<groupid>com.test</groupid>
<artifactid>stormwordcount</artifactid>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
</properties>
<dependencies>
    <dependency>
        <groupid>junit</groupid>
        <artifactid>junit</artifactid>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupid>org.apache.storm</groupid>
        <artifactid>storm-core</artifactid>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactid>maven-assembly-plugin</artifactid>
            <configuration>
                <descriptorrefs>
                    <descriptorref>jar-with-dependencies</descriptorref>
                </descriptorrefs>
                <archive>
                    <manifest>
                        <mainclass>com.test.stormwordcount.mainclass</mainclass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupid>org.apache.maven.plugins</groupid>
            <artifactid>maven-compiler-plugin</artifactid>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的问题

基于storm的wordcount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为eclipse ide for eclipse committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,help->install new software
基于Storm的WordCount
点击add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接: 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的javaee ide了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 windows->preferencs->maven->installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:
基于Storm的WordCount