storm报错:Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent
问题描述:
storm版本:1.2.2,kafka版本:2.11。
在使用storm去消费kafka中的数据时,发生了如下错误。
[root@node01 jars]# /opt/storm-1.2.2/bin/storm jar myproject-1.0-snapshot-jar-with-dependencies.jar com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo stormkafka slf4j: class path contains multiple slf4j bindings. slf4j: found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: found binding in [jar:file:/data/jars/myproject-1.0-snapshot-jar-with-dependencies.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation. slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory] running: /usr/java/jdk1.8.0_181/bin/java -client -ddaemon.name= -dstorm.options= -dstorm.home=/opt/storm-1.2.2 -dstorm.log.dir=/opt/storm-1.2.2/logs -djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -dstorm.conf.file= -cp /opt/storm-1.2.2/*:/opt/storm-1.2.2/lib/*:/opt/storm-1.2.2/extlib/*:myproject-1.0-snapshot-jar-with-dependencies.jar:/opt/storm-1.2.2/conf:/opt/storm-1.2.2/bin -dstorm.jar=myproject-1.0-snapshot-jar-with-dependencies.jar -dstorm.dependency.jars= -dstorm.dependency.artifacts={} com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo stormkafka slf4j: class path contains multiple slf4j bindings. slf4j: found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: found binding in [jar:file:/data/jars/myproject-1.0-snapshot-jar-with-dependencies.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation. slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory] 1329 [main] info o.a.s.k.s.kafkaspoutconfig - setting kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing 1338 [main] info o.a.s.k.s.kafkaspoutconfig - setting kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit 【run on cluster】 1617 [main] warn o.a.s.u.utils - storm-version new 1.2.2 old null 1699 [main] info o.a.s.stormsubmitter - generated zookeeper secret payload for md5-digest: -9173161025072727826:-6858502481790933429 1857 [main] warn o.a.s.u.nimbusclient - using deprecated config nimbus.host for backward compatibility. please update your storm.yaml so it only has config nimbus.seeds 1917 [main] info o.a.s.u.nimbusclient - found leader nimbus : node01:6627 1947 [main] info o.a.s.s.a.authutils - got autocreds [] 1948 [main] warn o.a.s.u.nimbusclient - using deprecated config nimbus.host for backward compatibility. please update your storm.yaml so it only has config nimbus.seeds 1950 [main] info o.a.s.u.nimbusclient - found leader nimbus : node01:6627 1998 [main] info o.a.s.stormsubmitter - uploading dependencies - jars... 1998 [main] info o.a.s.stormsubmitter - uploading dependencies - artifacts... 1998 [main] info o.a.s.stormsubmitter - dependency blob keys - jars : [] / artifacts : [] 2021 [main] info o.a.s.stormsubmitter - uploading topology jar myproject-1.0-snapshot-jar-with-dependencies.jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar 3832 [main] info o.a.s.stormsubmitter - successfully uploaded topology jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar 3832 [main] info o.a.s.stormsubmitter - submitting topology stormkafka in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-9173161025072727826:-6858502481790933429","topology.workers":1,"topology.debug":true} 3832 [main] warn o.a.s.u.utils - storm-version new 1.2.2 old 1.2.2 5588 [main] warn o.a.s.stormsubmitter - topology submission exception: component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout] exception in thread "main" java.lang.runtimeexception: invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]) at org.apache.storm.stormsubmitter.submittopologyas(stormsubmitter.java:273) at org.apache.storm.stormsubmitter.submittopology(stormsubmitter.java:387) at org.apache.storm.stormsubmitter.submittopology(stormsubmitter.java:159) at com.suhaha.storm.storm122_kafka211_demo02.kafkatopodemo.main(kafkatopodemo.java:47) caused by: invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]) at org.apache.storm.generated.nimbus$submittopology_result$submittopology_resultstandardscheme.read(nimbus.java:8070) at org.apache.storm.generated.nimbus$submittopology_result$submittopology_resultstandardscheme.read(nimbus.java:8047) at org.apache.storm.generated.nimbus$submittopology_result.read(nimbus.java:7981) at org.apache.storm.thrift.tserviceclient.receivebase(tserviceclient.java:86) at org.apache.storm.generated.nimbus$client.recv_submittopology(nimbus.java:306) at org.apache.storm.generated.nimbus$client.submittopology(nimbus.java:290) at org.apache.storm.stormsubmitter.submittopologyindistributemode(stormsubmitter.java:326) at org.apache.storm.stormsubmitter.submittopologyas(stormsubmitter.java:260) ... 3 more
报错图示如下:
报错的意思为:mybolt这个组件,在从kafka_sput组件上消费消息时,它所消费的default数据流是不存在的。
上面的报错是因为代码中有地方写错了,下面贴出代码
1)kafkatopodemo类(main方法入口类和kafkaspout设置)
1 package com.suhaha.storm.storm122_kafka211_demo02; 2 3 import org.apache.kafka.clients.consumer.consumerconfig; 4 import org.apache.storm.config; 5 import org.apache.storm.localcluster; 6 import org.apache.storm.stormsubmitter; 7 import org.apache.storm.generated.alreadyaliveexception; 8 import org.apache.storm.generated.authorizationexception; 9 import org.apache.storm.generated.invalidtopologyexception; 10 import org.apache.storm.kafka.spout.*; 11 import org.apache.storm.topology.topologybuilder; 12 import org.apache.storm.tuple.fields; 13 import org.apache.storm.tuple.values; 14 import org.apache.storm.kafka.spout.kafkaspoutretryexponentialbackoff.timeinterval; 15 import static org.apache.storm.kafka.spout.kafkaspoutconfig.firstpolloffsetstrategy.earliest; 16 17 /** 18 * @author suhaha 19 * @create 2019-04-28 00:44 20 * @comment storm消费kafka数据 21 */ 22 23 public class kafkatopodemo { 24 public static void main(string[] args) { 25 final topologybuilder topologybuilder = new topologybuilder(); 26 //简单的不可靠spout 27 // topologybuilder.setspout("kafka_spout", new kafkaspout<>(kafkaspoutconfig.builder("node01:9092,node02:9092,node03:9092", "topic01").build())); 28 29 //详细的设置spout,写一个方法生成kafkaspoutconfig 30 topologybuilder.setspout("kafka_spout", new kafkaspout<string,string>(newkafkaspoutconfig("topic01"))); 31 32 topologybuilder.setbolt("mybolt", new mybolt("/tmp/storm_test.log")).shufflegrouping("kafka_spout"); 33 34 //上面设置的是topology,现在设置storm配置 35 config stormconf=new config(); 36 stormconf.setnumworkers(1); 37 stormconf.setdebug(true); 38 39 if (args != null && args.length > 0) {//集群提交 40 system.out.println("【run on cluster】"); 41 42 try { 43 stormsubmitter.submittopology(args[0], stormconf, topologybuilder.createtopology()); 44 } catch (alreadyaliveexception e) { 45 e.printstacktrace(); 46 } catch (invalidtopologyexception e) { 47 e.printstacktrace(); 48 } catch (authorizationexception e) { 49 e.printstacktrace(); 50 } 51 system.out.println("提交完成"); 52 53 } else {//本地提交 54 system.out.println("【run on local】"); 55 localcluster lc = new localcluster(); 56 lc.submittopology("storm_kafka", stormconf, topologybuilder.createtopology()); 57 } 58 } 59 60 61 /** 62 * kafkaspoutconfig设置 63 */ 64 private static kafkaspoutconfig<string,string> newkafkaspoutconfig(string topic) { 65 bytopicrecordtranslator<string, string> trans = new bytopicrecordtranslator<>( 66 (r) -> new values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), 67 new fields("topic", "partition", "offset", "key", "value"), "stream1"); 68 //bootstrapserver 以及topic 69 return kafkaspoutconfig.builder("node01:9092,node02:9092,node03:9092", topic) 70 .setprop(consumerconfig.group_id_config, "kafkaspouttestgroup_" + system.nanotime())//设置kafka使用者组属性"group.id" 71 .setprop(consumerconfig.max_partition_fetch_bytes_config, 200) 72 .setrecordtranslator(trans)//修改spout如何将kafka消费者message转换为tuple,以及将该tuple发布到哪个stream中 73 .setretry(getretryservice())//重试策略 74 .setoffsetcommitperiodms(10_000) 75 .setfirstpolloffsetstrategy(earliest)//允许你设置从哪里开始消费数据 76 .setmaxuncommittedoffsets(250) 77 .build(); 78 } 79 80 /** 81 * 重试策略设置 82 */ 83 protected static kafkaspoutretryservice getretryservice() { 84 return new kafkaspoutretryexponentialbackoff(timeinterval.microseconds(500), 85 timeinterval.milliseconds(2), integer.max_value, timeinterval.seconds(10)); 86 } 87 }
2)bolt类(跟问题没啥关系)
1 package com.suhaha.storm.storm122_kafka211_demo02; 2 3 import org.apache.storm.task.outputcollector; 4 import org.apache.storm.task.topologycontext; 5 import org.apache.storm.topology.irichbolt; 6 import org.apache.storm.topology.outputfieldsdeclarer; 7 import org.apache.storm.tuple.tuple; 8 import java.io.filewriter; 9 import java.io.ioexception; 10 import java.util.map; 11 12 /** 13 * @author suhaha 14 * @create 2019-04-28 01:05 15 * @comment 该bolt中的处理逻辑非常简单,只是简单的从input中将各类数据取出来,然后简单的打印出来 16 * 并且将数据打印到path指定的文件中(这里需要注意的是,最终写出的文件是在执行该bolt task的worker上的, 17 * 而不在nimbus服务器路径下,也不一定在提交storm job的服务器上) 18 */ 19 20 public class mybolt implements irichbolt { 21 private filewriter filewriter = null; 22 string path = null; 23 24 @override 25 public void prepare(map stormconf, topologycontext context, outputcollector collector) { 26 try { 27 filewriter = new filewriter(path); 28 } catch (ioexception e) { 29 e.printstacktrace(); 30 } 31 } 32 33 /** 34 * 构造方法 35 * @param path 36 */ 37 public mybolt(string path) { 38 this.path = path; 39 } 40 41 42 @override 43 public void execute(tuple input) { 44 system.out.println(input); 45 try { 46 /** 47 * 从input中获取相应数据 48 */ 49 system.out.println("========================="); 50 string topic = input.getstring(0); 51 system.out.println("index 0 --> " + topic); //topic 52 system.out.println("topic --> " + input.getstringbyfield("topic")); 53 54 system.out.println("-------------------------"); 55 system.out.println("index 1 --> " + input.getinteger(1)); //partition 56 integer partition = input.getintegerbyfield("partition"); 57 system.out.println("partition-> " + partition); 58 59 system.out.println("-------------------------"); 60 long offset = input.getlong(2); 61 system.out.println("index 2 --> " + offset); //offset 62 system.out.println("offset----> " +input.getlongbyfield("offset")); 63 64 system.out.println("-------------------------"); 65 string key = input.getstring(3); 66 system.out.println("index 3 --> " + key); //key 67 system.out.println("key-------> " + input.getstringbyfield("key")); 68 69 system.out.println("-------------------------"); 70 string value = input.getstring(4); 71 system.out.println("index 4 --> " + value); //value 72 system.out.println("value--> " + input.getstringbyfield("value")); 73 74 string info = "topic: " + topic + ", partiton: " +partition + ", offset: " + offset + ", key: " + key +", value: " + value + "\n"; 75 system.out.println("info = " + info); 76 filewriter.write(info); 77 filewriter.flush(); 78 } catch (exception e) { 79 e.printstacktrace(); 80 } 81 } 82 83 @override 84 public void cleanup() { 85 // todo auto-generated method stub 86 } 87 88 @override 89 public void declareoutputfields(outputfieldsdeclarer declarer) { 90 // todo auto-generated method stub 91 } 92 93 @override 94 public map<string, object> getcomponentconfiguration() { 95 // todo auto-generated method stub 96 return null; 97 } 98 }
错误出现在kafkatopodemo类中,已在上面的代码中做了黄色高亮标注。
错误的原因在于,在代码中对recordtranslator进行设置时(第67行),将数据流id设置成了stream1;而在对topologybuilder设置bolt时(第32行),使用的分发策略是shufflegrouping("kafka_spout"),其实错误跟分发策略没关系,但是跟分发策略的使用方式有关系——当使用shufflegrouping(string componentid)这种方式设置分发策略时,mybolt组件默认是从上游组件的default 这个数据流中获取数据,而在代码中,我已将上游(kafka_spout)的数据流id设置成了stream1,故而导致了报错(invalidtopologyexception(msg:component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]),说default数据流不存在)。
因此,需要对代码做了相应修改,即:在设置mybolt组件的分发策略时,使用shufflegrouping(string componentid, string streamid),手动指定要读取的数据流id为stream1,如此,程序就不会报该错误了。
topologybuilder.setbolt("mybolt", new mybolt("/tmp/storm_test.log")).shufflegrouping("kafka_spout", "stream1");