欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm程序-单词统计wordcount

程序员文章站 2022-05-01 11:02:00
...

spout代码:

public class MyLocalFileSpout extends BaseRichSpout {
    public static final String FILE_PATH = "/root/1.log";
//    public static final String FILE_PATH = "D:\\1.log";
    private SpoutOutputCollector collector;
    private BufferedReader bufferedReader;

    //初始化方法
    //该方法只会被调用一次,用来初始化
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        try {
            this.bufferedReader = new BufferedReader(new FileReader(new File(FILE_PATH)));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    //Storm实时计算的特性就是对数据一条一条的处理
    //while(true){
    // this.nextTuple()
    // }
    public void nextTuple() {
        // 每被调用一次就会发送一条数据出去
        try {
            String line = bufferedReader.readLine();
            if (StringUtils.isNotBlank(line)) {
                List<Object> arrayList = new ArrayList<Object>();
                arrayList.add(line);
                collector.emit(arrayList);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
        declarer.declare(new Fields("juzi"));
    }
}

bolt1 句子分割成单词代码:

/**
 * Map --->word,1
 */
public class MySplitBolt extends BaseBasicBolt {

    public void execute(Tuple input, BasicOutputCollector collector) {
        //1、数据如何获取
        String juzi = (String) input.getValueByField("juzi");
        //2、进行切割
        String[] strings = juzi.split(" ");
        //3、发送数据
        for (String word : strings) {
            //Values 对象帮我们生成一个list
            collector.emit(new Values(word, 1));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "num"));
    }
}

bolt2 单词统计代码:

public class MyWordCountAndPrintBolt extends BaseBasicBolt {
    private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();

    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValueByField("word");
        Integer num = (Integer) input.getValueByField("num");
        //1、查看单词对应的value是否存在
        Integer integer = wordCountMap.get(word);
        if (integer == null || integer.intValue() == 0) {
            wordCountMap.put(word,num);
        }else {
            wordCountMap.put(word,integer.intValue()+num);
        }
        //2、打印数据
        System.out.println(wordCountMap);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //todo 不需要定义输出的字段
    }
}

storm 驱动类:

public class StormTopologyDriver {

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        //1、准备任务信息
        //Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        /* MyLocalFileSpout类,主要是将文本内容读成一行一行的模式
         * 消息源spout是Storm里面一个topology里面的消息生产者。
         * 一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。
         * Spout可以是可靠的也可以是不可靠的。
         * 如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
         *
         * 消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
         * 使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
         *
         * Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。
         * 要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
         *
         * 另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
         */

        topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(), 2);
        topologyBuilder.setBolt("bolt1", new MySplitBolt(), 4).shuffleGrouping("mySpout");
        topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(), 2).shuffleGrouping("bolt1");

        //2、任务提交
        //提交给谁?提交什么内容?
        Config config = new Config();
        //定义你希望集群分配多少个工作进程给你来执行这个topology
        config.setNumWorkers(2);
        StormTopology stormTopology = topologyBuilder.createTopology();

        //这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
        config.setDebug(false);
        //storm的运行有两种模式: 本地模式和分布式模式.

        //本地模式
//        LocalCluster localCluster = new LocalCluster();
//        localCluster.submitTopology("wordcount", config, stormTopology);
//        指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去
//        Utils.sleep(10000);
//        localCluster.shutdown();

        // 集群模式
        StormSubmitter.submitTopology("wordcount1", config, stormTopology);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.storm</groupId>
    <artifactId>learnStorm</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <!--当打包到集群上运行时,集群中已经提供了storm jar包,使用provided,可以在打成的包中不包含该jar包-->
            <scope>provided</scope>
            <version>0.9.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.5</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.itcast.storm.wordcount.StormTopologyDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

打包,上传至集群运行:
storm程序-单词统计wordcount

在storm ui上查看运行结果:
storm程序-单词统计wordcount

storm程序-单词统计wordcount

storm程序-单词统计wordcount

由上图可知,bolt2运行在 mini3 的6370端口。

登录mini3 查看运行日志:
storm程序-单词统计wordcount

相关标签: storm