欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Storm1.1.0<组件的并行度和组件的实例个数的关系>

程序员文章站 2022-07-01 19:53:36
...

软件环境:

Apache-storm-1.1.0

一个组件类的并行度与其实例化的个数有什么关系?来探索一下,
首先我配置控制台的输出,只输出日志的WARN级别以上和标准输出的信息http://blog.csdn.net/gpwner/article/details/74170806

然后利用storm-starter的例子:


import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
        }
    }

    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;

        public ExclamationBolt() {
        }

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
            System.out.println("OBJECT:" + this + "   hashcode:" + this.hashCode());
        }

        @Override
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 1);
        builder.setBolt("exclaim1", new ExclamationBolt(), 10).shuffleGrouping("word");
        builder.setBolt("exclaim2", new PrinterBolt(), 1).shuffleGrouping("exclaim1");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(1000000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

一个类的实例的哈希码是各不相同的,因此可以通过打印对象的哈希码来看看组件类Bolt的并行度与其实例个数的关系:
Storm1.1.0<组件的并行度和组件的实例个数的关系>

控制台信息:

Storm1.1.0<组件的并行度和组件的实例个数的关系>

OBJECT:neu.ExclamationTopology$ExclamationBolt@893f20f   hashcode:143913487
OBJECT:neu.ExclamationTopology$ExclamationBolt@174efa3   hashcode:24440739
OBJECT:neu.ExclamationTopology$ExclamationBolt@1acd5eb9   hashcode:449666745
OBJECT:neu.ExclamationTopology$ExclamationBolt@2ba329fb   hashcode:732113403
OBJECT:neu.ExclamationTopology$ExclamationBolt@4cac74ad   hashcode:1286370477
OBJECT:neu.ExclamationTopology$ExclamationBolt@74cf9261   hashcode:1959760481
OBJECT:neu.ExclamationTopology$ExclamationBolt@741325f7   hashcode:1947411959
OBJECT:neu.ExclamationTopology$ExclamationBolt@655f6a1f   hashcode:1700751903
OBJECT:neu.ExclamationTopology$ExclamationBolt@ddafc5a   hashcode:232455258
OBJECT:neu.ExclamationTopology$ExclamationBolt@1b23229e   hashcode:455287454
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 1);
        builder.setBolt("exclaim1", new ExclamationBolt(), 10).shuffleGrouping("word");
        builder.setBolt("exclaim2", new PrinterBolt(), 1).shuffleGrouping("exclaim1");

我的拓扑设置的Bolt的并行度为10,从结果有可以看出这个Bolt类产生了十个不同的对象