欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java实现Kafka的生产者和消费者例子

程序员文章站 2024-02-01 17:53:04
Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者。 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键、值进行保存。 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息... ......

kafka的结构与rabbitmq类似,消息生产者向kafka服务器发送消息,kafka接收消息后,再投递给消费者。
生产者的消费会被发送到topic中,topic中保存着各类数据,每一条数据都使用键、值进行保存。
每一个topic中都包含一个或多个物理分区(partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器。

新建一个maven项目,pom.xml 加入依赖:

        <dependency>
            <groupid>org.apache.kafka</groupid>
            <artifactid>kafka-clients</artifactid>
            <version>2.3.0</version>
        </dependency>

1、编写生产者

将消息投递到kafka服务器的名称为“topic1”的topic中

package com.example.kafkatest;

import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.common.serialization.stringserializer;

import java.util.properties;

public class producer {
    public static void main(string[] args) {
        //配置信息
        properties props = new properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //设置数据key和value的序列化处理类
        props.put("key.serializer", stringserializer.class);
        props.put("value.serializer", stringserializer.class);
        //创建生产者实例
        kafkaproducer<string,string> producer = new kafkaproducer<>(props);
        producerrecord record = new producerrecord<string, string>("topic1", "username", "lc");
        //发送记录
        producer.send(record);
        producer.close();
    }
}

运行后,可打开命令行工具,进入kafka目录,执行命令查询服务器的topic:

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

结果如下:

Java实现Kafka的生产者和消费者例子


2、编写消费者

本例中,消费者和生产者在同一个项目中,只是使用不同的启动类。
消费者会为自已指定一个消费者组的标识,每一条发布到topic的记录,都会被交付给消费者组的一个消费者实例。
如果多个消费者实例有相同的消费者组,则这些记录会分配到各个消费者实例上,以达到负载均衡的目录。
如果所有的消费者有不同的消费者组,则每一条记录都会广播到全部的消费者进行处理。

package com.example.rabbittest;

import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;

import java.time.duration;
import java.util.arrays;
import java.util.properties;

public class consumer {
    public static void main(string[] args) {
        //配置信息
        properties props = new properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //必须指定消费者组
        props.put("group.id", "test");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", stringdeserializer.class);
        props.put("value.deserializer", stringdeserializer.class);
        //创建消息者实例
        kafkaconsumer<string,string> consumer = new kafkaconsumer<>(props);
        //订阅topic1的消息
        consumer.subscribe(arrays.aslist("topic1"));
        //到服务器中读取记录
        while (true){
            consumerrecords<string,string> records = consumer.poll(duration.ofmillis(100));
            for(consumerrecord<string,string> record : records){
                system.out.println("key:" + record.key() + "" + ",value:" + record.value());
            }
        }
    }
}

运行后,idea控制台其中输出如下:

Java实现Kafka的生产者和消费者例子