欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

storm报错:Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent

程序员文章站 2022-09-27 13:21:38
问题描述: storm版本:1.2.2,kafka版本:2.11。 在使用storm去消费kafka中的数据时,发生了如下错误。 报错图示如下: 报错的意思为:mybolt这个组件,在从kafka_sput组件上消费消息时,它所消费的default数据流是不存在的。 上面的报错是因为代码中有地方写错 ......

问题描述:

  storm版本:1.2.2,kafka版本:2.11。

    在使用storm去消费kafka中的数据时,发生了如下错误。

[root@node01 jars]# /opt/storm-1.2.2/bin/storm jar myproject-1.0-snapshot-jar-with-dependencies.jar com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo stormkafka
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/data/jars/myproject-1.0-snapshot-jar-with-dependencies.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory]
running: /usr/java/jdk1.8.0_181/bin/java -client -ddaemon.name= -dstorm.options= -dstorm.home=/opt/storm-1.2.2 -dstorm.log.dir=/opt/storm-1.2.2/logs -djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -dstorm.conf.file= -cp /opt/storm-1.2.2/*:/opt/storm-1.2.2/lib/*:/opt/storm-1.2.2/extlib/*:myproject-1.0-snapshot-jar-with-dependencies.jar:/opt/storm-1.2.2/conf:/opt/storm-1.2.2/bin -dstorm.jar=myproject-1.0-snapshot-jar-with-dependencies.jar -dstorm.dependency.jars= -dstorm.dependency.artifacts={} com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo stormkafka
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/data/jars/myproject-1.0-snapshot-jar-with-dependencies.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory]
1329 [main] info  o.a.s.k.s.kafkaspoutconfig - setting kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing
1338 [main] info  o.a.s.k.s.kafkaspoutconfig - setting kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit
【run on cluster】
1617 [main] warn  o.a.s.u.utils - storm-version new 1.2.2 old null
1699 [main] info  o.a.s.stormsubmitter - generated zookeeper secret payload for md5-digest: -9173161025072727826:-6858502481790933429
1857 [main] warn  o.a.s.u.nimbusclient - using deprecated config nimbus.host for backward compatibility. please update your storm.yaml so it only has config nimbus.seeds
1917 [main] info  o.a.s.u.nimbusclient - found leader nimbus : node01:6627
1947 [main] info  o.a.s.s.a.authutils - got autocreds []
1948 [main] warn  o.a.s.u.nimbusclient - using deprecated config nimbus.host for backward compatibility. please update your storm.yaml so it only has config nimbus.seeds
1950 [main] info  o.a.s.u.nimbusclient - found leader nimbus : node01:6627
1998 [main] info  o.a.s.stormsubmitter - uploading dependencies - jars...
1998 [main] info  o.a.s.stormsubmitter - uploading dependencies - artifacts...
1998 [main] info  o.a.s.stormsubmitter - dependency blob keys - jars : [] / artifacts : []
2021 [main] info  o.a.s.stormsubmitter - uploading topology jar myproject-1.0-snapshot-jar-with-dependencies.jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar
3832 [main] info  o.a.s.stormsubmitter - successfully uploaded topology jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar
3832 [main] info  o.a.s.stormsubmitter - submitting topology stormkafka in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-9173161025072727826:-6858502481790933429","topology.workers":1,"topology.debug":true}
3832 [main] warn  o.a.s.u.utils - storm-version new 1.2.2 old 1.2.2
5588 [main] warn  o.a.s.stormsubmitter - topology submission exception: component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]
exception in thread "main" java.lang.runtimeexception: invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])
    at org.apache.storm.stormsubmitter.submittopologyas(stormsubmitter.java:273)
    at org.apache.storm.stormsubmitter.submittopology(stormsubmitter.java:387)
    at org.apache.storm.stormsubmitter.submittopology(stormsubmitter.java:159)
    at com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo.main(kafkatopodemo.java:47)
caused by: invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])
    at org.apache.storm.generated.nimbus$submittopology_result$submittopology_resultstandardscheme.read(nimbus.java:8070)
    at org.apache.storm.generated.nimbus$submittopology_result$submittopology_resultstandardscheme.read(nimbus.java:8047)
    at org.apache.storm.generated.nimbus$submittopology_result.read(nimbus.java:7981)
    at org.apache.storm.thrift.tserviceclient.receivebase(tserviceclient.java:86)
    at org.apache.storm.generated.nimbus$client.recv_submittopology(nimbus.java:306)
    at org.apache.storm.generated.nimbus$client.submittopology(nimbus.java:290)
    at org.apache.storm.stormsubmitter.submittopologyindistributemode(stormsubmitter.java:326)
    at org.apache.storm.stormsubmitter.submittopologyas(stormsubmitter.java:260)
    ... 3 more

 

报错图示如下:

storm报错:Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent

报错的意思为:mybolt这个组件,在从kafka_sput组件上消费消息时,它所消费的default数据流是不存在的。

上面的报错是因为代码中有地方写错了,下面贴出代码

1)kafkatopodemo类(main方法入口类和kafkaspout设置

 1 package com.suhaha.storm.storm122_kafka211_demo02;
 2 
 3 import org.apache.kafka.clients.consumer.consumerconfig;
 4 import org.apache.storm.config;
 5 import org.apache.storm.localcluster;
 6 import org.apache.storm.stormsubmitter;
 7 import org.apache.storm.generated.alreadyaliveexception;
 8 import org.apache.storm.generated.authorizationexception;
 9 import org.apache.storm.generated.invalidtopologyexception;
10 import org.apache.storm.kafka.spout.*;
11 import org.apache.storm.topology.topologybuilder;
12 import org.apache.storm.tuple.fields;
13 import org.apache.storm.tuple.values;
14 import org.apache.storm.kafka.spout.kafkaspoutretryexponentialbackoff.timeinterval;
15 import static org.apache.storm.kafka.spout.kafkaspoutconfig.firstpolloffsetstrategy.earliest;
16 
17 /**
18  * @author suhaha
19  * @create 2019-04-28 00:44
20  * @comment storm消费kafka数据
21  */
22 
23 public class kafkatopodemo {
24     public static void main(string[] args) {
25         final topologybuilder topologybuilder = new topologybuilder();
26         //简单的不可靠spout
27 //        topologybuilder.setspout("kafka_spout", new kafkaspout<>(kafkaspoutconfig.builder("node01:9092,node02:9092,node03:9092", "topic01").build()));
28 
29         //详细的设置spout,写一个方法生成kafkaspoutconfig
30         topologybuilder.setspout("kafka_spout", new kafkaspout<string,string>(newkafkaspoutconfig("topic01")));
31 
32         topologybuilder.setbolt("mybolt", new mybolt("/tmp/storm_test.log")).shufflegrouping("kafka_spout");
33 
34         //上面设置的是topology,现在设置storm配置
35         config stormconf=new config();
36         stormconf.setnumworkers(1);
37         stormconf.setdebug(true);
38 
39         if (args != null && args.length > 0) {//集群提交
40             system.out.println("【run on cluster】");
41 
42             try {
43                 stormsubmitter.submittopology(args[0], stormconf, topologybuilder.createtopology());
44             } catch (alreadyaliveexception e) {
45                 e.printstacktrace();
46             } catch (invalidtopologyexception e) {
47                 e.printstacktrace();
48             } catch (authorizationexception e) {
49                 e.printstacktrace();
50             }
51             system.out.println("提交完成");
52 
53         } else {//本地提交
54             system.out.println("【run on local】");
55             localcluster lc = new localcluster();
56             lc.submittopology("storm_kafka", stormconf, topologybuilder.createtopology());
57         }
58     }
59 
60 
61     /**
62      * kafkaspoutconfig设置
63     */
64     private static kafkaspoutconfig<string,string> newkafkaspoutconfig(string topic) {
65         bytopicrecordtranslator<string, string> trans = new bytopicrecordtranslator<>(
66                 (r) -> new values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
67                 new fields("topic", "partition", "offset", "key", "value"), "stream1");
68         //bootstrapserver 以及topic
69         return kafkaspoutconfig.builder("node01:9092,node02:9092,node03:9092", topic)
70                 .setprop(consumerconfig.group_id_config, "kafkaspouttestgroup_" + system.nanotime())//设置kafka使用者组属性"group.id"
71                 .setprop(consumerconfig.max_partition_fetch_bytes_config, 200)
72                 .setrecordtranslator(trans)//修改spout如何将kafka消费者message转换为tuple,以及将该tuple发布到哪个stream中
73                 .setretry(getretryservice())//重试策略
74                 .setoffsetcommitperiodms(10_000)
75                 .setfirstpolloffsetstrategy(earliest)//允许你设置从哪里开始消费数据
76                 .setmaxuncommittedoffsets(250)
77                 .build();
78     }
79 
80     /**
81      * 重试策略设置
82     */
83     protected static kafkaspoutretryservice getretryservice() {
84         return new kafkaspoutretryexponentialbackoff(timeinterval.microseconds(500),
85             timeinterval.milliseconds(2), integer.max_value, timeinterval.seconds(10));
86     }
87 }

 

 2)bolt类(跟问题没啥关系)

 1 package com.suhaha.storm.storm122_kafka211_demo02;
 2 
 3 import org.apache.storm.task.outputcollector;
 4 import org.apache.storm.task.topologycontext;
 5 import org.apache.storm.topology.irichbolt;
 6 import org.apache.storm.topology.outputfieldsdeclarer;
 7 import org.apache.storm.tuple.tuple;
 8 import java.io.filewriter;
 9 import java.io.ioexception;
10 import java.util.map;
11 
12 /**
13  * @author suhaha
14  * @create 2019-04-28 01:05
15  * @comment  该bolt中的处理逻辑非常简单,只是简单的从input中将各类数据取出来,然后简单的打印出来
16  *         并且将数据打印到path指定的文件中(这里需要注意的是,最终写出的文件是在执行该bolt task的worker上的,
17  *      而不在nimbus服务器路径下,也不一定在提交storm job的服务器上)
18  */
19 
20 public class mybolt implements irichbolt {
21     private filewriter filewriter = null;
22     string path = null;
23 
24     @override
25     public void prepare(map stormconf, topologycontext context, outputcollector collector) {
26         try {
27             filewriter = new filewriter(path);
28         } catch (ioexception e) {
29             e.printstacktrace();
30         }
31     }
32 
33     /**
34      * 构造方法
35      * @param path
36      */
37     public mybolt(string path) {
38         this.path = path;
39     }
40 
41 
42     @override
43     public void execute(tuple input) {
44         system.out.println(input);
45         try {
46             /**
47              * 从input中获取相应数据
48              */
49             system.out.println("=========================");
50             string topic = input.getstring(0);
51             system.out.println("index 0 --> " + topic);        //topic
52             system.out.println("topic   --> " + input.getstringbyfield("topic"));
53 
54             system.out.println("-------------------------");
55             system.out.println("index 1 --> " + input.getinteger(1));        //partition
56             integer partition = input.getintegerbyfield("partition");
57             system.out.println("partition-> " + partition);
58 
59             system.out.println("-------------------------");
60             long offset = input.getlong(2);
61             system.out.println("index 2 --> " + offset);        //offset
62             system.out.println("offset----> " +input.getlongbyfield("offset"));
63 
64             system.out.println("-------------------------");
65             string key = input.getstring(3);
66             system.out.println("index 3 --> " + key);        //key
67             system.out.println("key-------> " + input.getstringbyfield("key"));
68 
69             system.out.println("-------------------------");
70             string value = input.getstring(4);
71             system.out.println("index 4 --> " + value);        //value
72             system.out.println("value--> " + input.getstringbyfield("value"));
73 
74             string info = "topic: " + topic + ", partiton: " +partition + ", offset: " + offset + ", key: " + key +", value: " + value + "\n";
75             system.out.println("info = " + info);
76             filewriter.write(info);
77             filewriter.flush();
78         } catch (exception e) {
79             e.printstacktrace();
80         }
81     }
82 
83     @override
84     public void cleanup() {
85         // todo auto-generated method stub
86     }
87 
88     @override
89     public void declareoutputfields(outputfieldsdeclarer declarer) {
90         // todo auto-generated method stub
91     }
92 
93     @override
94     public map<string, object> getcomponentconfiguration() {
95         // todo auto-generated method stub
96         return null;
97     }
98 }

 


 

错误出现在kafkatopodemo类中,已在上面的代码中做了黄色高亮标注。

错误的原因在于,在代码中对recordtranslator进行设置时(第67行),将数据流id设置成了stream1;而在对topologybuilder设置bolt时(第32行),使用的分发策略是shufflegrouping("kafka_spout"),其实错误跟分发策略没关系,但是跟分发策略的使用方式有关系——当使用shufflegrouping(string componentid)这种方式设置分发策略时,mybolt组件默认是从上游组件的default 这个数据流中获取数据,而在代码中,我已将上游(kafka_spout)的数据流id设置成了stream1,故而导致了报错(invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]),说default数据流不存在)。

因此,需要对代码做了相应修改,即:在设置mybolt组件的分发策略时,使用shufflegrouping(string componentid, string streamid),手动指定要读取的数据流id为stream1,如此,程序就不会报该错误了。

topologybuilder.setbolt("mybolt", new mybolt("/tmp/storm_test.log")).shufflegrouping("kafka_spout", "stream1");