欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka入门

程序员文章站 2022-03-21 23:21:34
...

 

kafka下载:http://kafka.apache.org/downloads

解压下载下来的文件,bin目录下是常用命令,config目录下是配置文件。

kafka已经内置了一个zookeeper环境,可以直接使用。

建议kafka在linux环境下使用。

 

Kafka的简单测试

以下是在windows环境下的一个kafka测试:

配置则直接使用默认的配置即可。

1,启动zookeeper:

进入bin/windows目录,运行命令:

>zookeeper-server-start.bat  ../../config/zookeeper.properties

2,启动kafka:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-server-start.bat ../../config/server.properties

3,创建一个Topic:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-topics.bat  -create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test

创建Topic完成后,可通过命令查看Topic:

>kafka-topics.bat -list --zookeeper localhost:2181

4,创建一个消息消费者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic test --from-beginning

执行命令后,暂时不会有任何信息返回,消费者在等待信息。

5,创建一个消息生产者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-producer.bat  --broker-list localhost:9092 --topic test

此时输入信息,然后按回车键。则完成了一条消息的生产,此时可看到在消息消费者窗口,会打印出刚刚生产的消息。

此时一个Kafka的使用实例完成。

 

Java代码完成Kafka测试

也可以直接通过Java代码来完成上述过程:

如上面步骤3:创建一个Topic,Java代码如下:

public static void main(String[] args) {
		// TODO Auto-generated method stub

		Properties prop = new Properties();
		prop.put("bootstrap.servers", "localhost:9092");
		
		AdminClient ac = AdminClient.create(prop);
		ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
		NewTopic topic = new NewTopic("topic-test2",1,(short)1);
		topics.add(topic);
		
		CreateTopicsResult result = ac.createTopics(topics);
		try {
			System.out.println("show create topics result:");
			result.all().get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

 然后通过命令

>kafka-topics.bat -list --zookeeper localhost:2181

可查看刚刚通过程序新增的Topic。

 

创建一个消息生产者,

public void createMsg(){
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		Producer<String,String> p = new KafkaProducer<String,String>(prop);
		for(int i=0;i<50;i++){
			p.send(new ProducerRecord<String,String>("topic-test1","topictest message" + i));
		}
		p.close();
	}

 

生产的消息能够通过命令查看:

>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-test1  --from-beginning

 

创建一个消息消费者,把收到的消息信息展示出来:

public void doconsume(){
		prop.put("group.id", "test");
		prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
		consumer.subscribe(Arrays.asList("topic-test1"),new ConsumerRebalanceListener(){

			public void onPartitionsRevoked(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				
			}

			public void onPartitionsAssigned(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				consumer.seekToBeginning(partitions);
				}
		
		});
		
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
			for(ConsumerRecord<String,String> record: records)
				System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
		}
	}