Storm实时读取Kafka
程序员文章站
2022-03-20 20:57:17
...
利用Storm的实时处理功能,从Kafka中读取消息,将消息合并后并打印(依次输入hello world .)
Storm版本:1.1.1
Kafka版本:2.12-0.11.0.0
Zookeeper版本:3.4.9
1、Kafka配置
server.properties文件修改
#发布外网ip
advertised.listeners=PLAINTEXT://*.*.*.*:9092
#删除topic
delete.topic.enable=true
kafka相关脚本
//启动 ./bin/kafka-server-start.sh ./config/server.properties //创建topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic words_topic //发送消息 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words_topic
2、maven工程
新建maven项目,添加pom.xml配置
<properties> <kafka.version>0.11.0.0</kafka.version> <storm.version>1.1.1</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
创建SentenceBolt
public class SentenceBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 9063211371729556973L; private List<String> words = new ArrayList<String>(); OutputCollector _collector; @Override public void execute(Tuple input) { // 获取输入单词 String word = input.getString(0); if (StringUtils.isBlank(word)) { return; } System.out.println("Received Word:" + word); // add word to current list of words words.add(word); if (word.endsWith(".")) { //结束符,打印整条语句 _collector.emit(ImmutableList.of((Object) StringUtils.join(words, ' '))); words.clear(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } }
创建PrinterBolt
public class PrinterBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 9063211371729556973L; @Override public void execute(Tuple input) { String sentence = input.getString(0); System.out.println("Received Sentence: " + sentence); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } }
创建KafkaTopology
public class KafkaTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // zookeeper hosts for the Kafka cluster ZkHosts zkHosts = new ZkHosts("ip:2181"); SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7"); kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme) new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout"); builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt"); //本地模式 LocalCluster cluster = new LocalCluster(); Config conf = new Config(); // Submit topology for execution cluster.submitTopology("KafkaToplogy", conf, builder.createTopology()); try { System.out.println("Waiting to consume from kafka"); Thread.sleep(30000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the KafkaTopology cluster.killTopology("KafkaToplogy"); // shut down the storm test cluster cluster.shutdown(); } }
运行结果
//控制台发送消息 >hello >world >. //结果显示 Waiting to consume from kafka Received Word:hello Received Word:world Received Word:. Received Sentence: hello world.
上一篇: ActiveMQ的JMS使用
下一篇: 一道关于单词归类的算法题简单求解
推荐阅读
-
使用Flume+Kafka+SparkStreaming进行实时日志解析
-
Spring boot集成Kafka+Storm的示例代码
-
Spring boot集成Kafka+Storm的示例代码
-
kafka与storm集群环境的安装步骤详解
-
kafka与storm集群环境的安装步骤详解
-
Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控
-
python 读取视频,处理后,实时计算帧数fps的方法
-
mysql-replication实时读取binlog
-
OpenCV学习笔记(一):使用opencv读取摄像头并实时显示
-
Storm 系列(九)—— Storm 集成 Kafka