欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Storm实时读取Kafka

程序员文章站 2022-03-20 20:57:17
...

利用Storm的实时处理功能,从Kafka中读取消息,将消息合并后并打印(依次输入hello world .)

Storm版本:1.1.1

Kafka版本:2.12-0.11.0.0

Zookeeper版本:3.4.9

1、Kafka配置

server.properties文件修改

#发布外网ip

advertised.listeners=PLAINTEXT://*.*.*.*:9092

#删除topic

delete.topic.enable=true

 

kafka相关脚本

//启动
./bin/kafka-server-start.sh ./config/server.properties
//创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic words_topic
//发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words_topic

 2、maven工程

新建maven项目,添加pom.xml配置

 

<properties>
	<kafka.version>0.11.0.0</kafka.version>
	<storm.version>1.1.1</storm.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.12</artifactId>
		<version>${kafka.version}</version>
		<exclusions>
			<exclusion>
				<groupId>javax.jms</groupId>
				<artifactId>jms</artifactId>
			</exclusion>
			<exclusion>
				<groupId>com.sun.jdmk</groupId>
				<artifactId>jmxtools</artifactId>
			</exclusion>
			<exclusion>
				<groupId>com.sun.jmx</groupId>
				<artifactId>jmxri</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-kafka</artifactId>
		<version>${storm.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>${storm.version}</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>log4j-over-slf4j</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>commons-collections</groupId>
		<artifactId>commons-collections</artifactId>
		<version>3.2.1</version>
	</dependency>
		<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>15.0</version>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.5.1</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.codehaus.mojo</groupId>
			<artifactId>exec-maven-plugin</artifactId>
			<version>1.2.1</version>
			<executions>
				<execution>
					<goals>
						<goal>exec</goal>
					</goals>
				</execution>
			</executions>
			<configuration>
				<executable>java</executable>
				<includeProjectDependencies>true</includeProjectDependencies>
				<includePluginDependencies>false</includePluginDependencies>
				<classpathScope>compile</classpathScope>
				<mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
			</configuration>
		</plugin>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
				<archive>
					<manifest>
						<mainClass></mainClass>
					</manifest>
				</archive>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
					<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

 创建SentenceBolt

public class SentenceBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9063211371729556973L;

	private List<String> words = new ArrayList<String>();

	OutputCollector _collector;

	@Override
	public void execute(Tuple input) {
		// 获取输入单词
		String word = input.getString(0);
		if (StringUtils.isBlank(word)) {
			return;
		}
		System.out.println("Received Word:" + word);
		// add word to current list of words
		words.add(word);
		if (word.endsWith(".")) { //结束符,打印整条语句
			_collector.emit(ImmutableList.of((Object) StringUtils.join(words, ' ')));
			words.clear();
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		_collector = collector;
	}

}

 创建PrinterBolt

public class PrinterBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9063211371729556973L;

	@Override
	public void execute(Tuple input) {
		String sentence = input.getString(0);
		System.out.println("Received Sentence: " + sentence);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
	}

}

 创建KafkaTopology

public class KafkaTopology {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		// zookeeper hosts for the Kafka cluster
		ZkHosts zkHosts = new ZkHosts("ip:2181");
		
		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7");
		
		kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme) new StringScheme());
		
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
		
		builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
		builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
		//本地模式
		LocalCluster cluster = new LocalCluster();
		Config conf = new Config();
		// Submit topology for execution
		cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
		try {
			System.out.println("Waiting to consume from kafka");
			Thread.sleep(30000);
		} catch (Exception exception) {
			System.out.println("Thread interrupted exception : " + exception);
		}
		// kill the KafkaTopology
		cluster.killTopology("KafkaToplogy");
		// shut down the storm test cluster
		cluster.shutdown();
	}
}

 运行结果

//控制台发送消息
>hello
>world
>.

//结果显示
Waiting to consume from kafka
Received Word:hello
Received Word:world
Received Word:.
Received Sentence: hello world.