欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka的生产者,消费者JavaApi操作实例

程序员文章站 2024-01-14 11:36:28
...

[提前声明]
文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章
写作不易,转载请注明,谢谢!
代码案例地址: ????https://github.com/Mydreamandreality/sparkResearch


Kafka JavaApi代码案例

首先引入我们的maven文件
这里注意version和服务器的kafka版本要一致

        <!--分布式消息队列-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>

Java编写生产者代码

首先我们先定义生产者需要的一些配置
Kafka的生产者,消费者JavaApi操作实例
代码:
具体的配置属性在注视中都有详细的解释

/**
 * Created by 張燿峰
 * Kafka消费者参数配置
 *
 * @author 孤
 * @date 2019/4/25
 * @Varsion 1.0
 */
public class ConstantConsumer {

    /**
     * 	用来唯一标识consumer进程所在组的字符串,
     * 	如果设置同样的group  id,表示这些processes都是属于同一个consumer  group
     */
    public static String GROUP_ID = "group.id";

    /**
     * 如果值为真,则为偏移启用自动落实,否则不提交。
     */
    public static String ENABLE_AUTO_COMMIT = "enable.auto.commit";

    /**
     * consumer向zookeeper提交offset的频率,单位是秒
     */
    public static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";

    /**
     * zookeeper 会话的超时限制。
     * 如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
     */
    public static String SESSION_TIMEOUT_MS = "session.timeout.ms";

    /**
     * zookeeper中没有初始化的offset时,如果offset是以下值的回应:
     * smallest:自动复位offset为smallest的offset
     * largest:自动复位offset为largest的offset
     * anything  else:向consumer抛出异常
     */
    public static String AUTO_OFFSET_RESET = "auto.offset.reset";

    public static String KEY_DESERIALIZER = "key.deserializer";

    public static String VALUE_DESERIALIZER = "value.deserializer";
}

定义好配置后开始编写生产者代码:

**
 * Created by 張燿峰
 * kafka生产者代码案例
 *
 * @author 孤
 * @date 2019/4/25
 * @Varsion 1.0
 */
public class KafkaProducer extends Thread{
    //定义kafka生产者
    private static org.apache.kafka.clients.producer.KafkaProducer producer;
    //定义主题
    private static final String TOPIC = "JavaKafka";
    
    //初始化Kafka生产者
    public KafkaProducer() {
        Properties properties = new Properties();
        properties.put(ConstantProducer.BOOTSTRAP_SERVERS, "192.168.253.132:9092");
        properties.put(ConstantProducer.ACKS, "all");
        properties.put(ConstantProducer.RETRIES, 0);
        properties.put(ConstantProducer.BATCH_SIZE, 16385);
        properties.put(ConstantProducer.LINGER_MS, 1);
        properties.put(ConstantProducer.BUFFER_MEMORY, 33554432);
        properties.put(ConstantProducer.KEY_SERIALIZER_CLASS, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ConstantProducer.VALUE_SERIALIZER_CLASS, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
    }
    
    //生产消息
    public void sendProducer() {
        for (int i = 0; i < 10; i++) {
            String key = String.valueOf(i);
            String data  = "hello message "+ key;
            producer.send(new ProducerRecord<>(TOPIC,key,data));
            System.out.println("SUCCESS~");
        }
        producer.close();
    }

    public static void main(String[] args) {
        new KafkaProducer().sendProducer();
    }
}

从上面的代码中可以看到,创建生产者大概分为这么几步

  • 首先定义kafka生产者客户端
  • 然后定义我们的主题
  • 接着配置初始kafka生产者需要使用的参数
  • 最后使用 producer.send生产消息
  • 最后记得关闭资源

运行生产者示例如下:
Kafka的生产者,消费者JavaApi操作实例

Java编写消费者代码

首先我们先定义消费者需要的一些配置
Kafka的生产者,消费者JavaApi操作实例
代码:
具体的配置属性在注视中都有详细的解释

package v1.Constant;

/**
 * Created by 張燿峰
 * Kafka消费者参数配置
 *
 * @author 孤
 * @date 2019/4/25
 * @Varsion 1.0
 */
public class ConstantConsumer {

    /**
     * 	用来唯一标识consumer进程所在组的字符串,
     * 	如果设置同样的group  id,表示这些processes都是属于同一个consumer  group
     */
    public static String GROUP_ID = "group.id";

    /**
     * 如果值为真,则为偏移启用自动落实,否则不提交。
     */
    public static String ENABLE_AUTO_COMMIT = "enable.auto.commit";

    /**
     * consumer向zookeeper提交offset的频率,单位是秒
     */
    public static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";

    /**
     * zookeeper 会话的超时限制。
     * 如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
     */
    public static String SESSION_TIMEOUT_MS = "session.timeout.ms";

    /**
     * zookeeper中没有初始化的offset时,如果offset是以下值的回应:
     * smallest:自动复位offset为smallest的offset
     * largest:自动复位offset为largest的offset
     * anything  else:向consumer抛出异常
     */
    public static String AUTO_OFFSET_RESET = "auto.offset.reset";

    public static String KEY_DESERIALIZER = "key.deserializer";

    public static String VALUE_DESERIALIZER = "value.deserializer";
}

定义好配置后开始编写消费者代码:

package v1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import v1.Constant.ConstantConsumer;
import v1.Constant.ConstantProducer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by 張燿峰
 * kafka消费者代码案例
 *
 * @author 孤
 * @date 2019/4/25
 * @Varsion 1.0
 */
public class KafkaConsumers extends Thread{
    //定义kafka消费者
    private static KafkaConsumer<String, String> consumer;
    
    //定义主题
    private static final String TOPIC = "JavaKafka";
    
    //初始化kafka消费者
    public KafkaConsumers() {
        Properties properties = new Properties();
        properties.put(ConstantProducer.BOOTSTRAP_SERVERS, "192.168.253.132:9092");
        properties.put(ConstantConsumer.GROUP_ID, "test-consumer-group");
        properties.put(ConstantConsumer.ENABLE_AUTO_COMMIT, "true");
        properties.put(ConstantConsumer.AUTO_COMMIT_INTERVAL_MS, 1000);
        properties.put(ConstantConsumer.SESSION_TIMEOUT_MS, 30000);
        properties.put(ConstantConsumer.AUTO_OFFSET_RESET, "earliest");
        properties.put(ConstantConsumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConstantConsumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(properties);
    }
    
    //消费消息
    public void getConsumers() {
        consumer.subscribe(Arrays.asList(TOPIC));
        while (true) {
            //TODO 成功执行后 输出文档~
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> consumerRecords : records) {
                System.out.println("key:" + consumerRecords.key() + ", value: " + consumerRecords.value() + ", topic: " + consumerRecords.topic());
            }
        }
    }

    public static void main(String[] args) {
        new KafkaConsumers().getConsumers();
    }
}

如上代码:整体的逻辑和生产者是一样的

  • 定义消费者,定义主题,初始化消费者,消费消息

采坑事项:

  • ConstantConsumer.GROUP_ID 这个参数的value需要和服务器kafka/config/consumer.properties文件中的group.id=test-consumer-group Value一致,否则会造成消费者无法消费的情况
    Kafka的生产者,消费者JavaApi操作实例
  • 在运行代码前需要保证服务器的zookeeper和kafka服务正常启动
    Kafka的生产者,消费者JavaApi操作实例
  • 运行环境和服务器环境的kafka版本要保持一致:
    Kafka的生产者,消费者JavaApi操作实例