欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka原生API

程序员文章站 2022-03-26 21:38:54
...

一、依赖

<dependencies>
        <!-- kafka依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-nop</artifactId>
             <version>1.7.2</version>
         </dependency>
    </dependencies>

二、生产端

1、生产端执行代码

package producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class OneProducer {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    //ProducerConfig类包含properties中的key
    public OneProducer() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "39.97.176.160:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 指定生产者每10条向broker发送一次
        //properties.put("batch.size", 10);
        // 指定生产者每50ms向broker发送一次
        //properties.put("linger.ms", 50);
        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        // 创建消息记录(包含主题、消息本身)  (String topic, V value)
        //ProducerRecord<Integer, String> record = new ProducerRecord<>("test", "tianjin");
        // 创建消息记录(包含主题、key、消息本身)  (String topic, K key, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        // 创建消息记录(包含主题、partition、key、消息本身)  (String topic, Integer partition, K key, V value)
        ProducerRecord<Integer, String> record = new ProducerRecord<>("test","huNan");
        producer.send(record);
    }

    //有结果回调
    public void sendMsgTwo() {
        ProducerRecord<Integer, String> record = new ProducerRecord<>("test", "shengzhen");
        producer.send(record, (metadata, ex) -> {
            System.out.println("topic = " + metadata.topic());
            System.out.println("partition = " + metadata.partition());
            System.out.println("offset = " + metadata.offset());
        });
    }

}

2、生产端执行入口

package producer;

import java.io.IOException;

public class OneProducerTest {

    public static void main(String[] args) throws IOException {
        OneProducer producer = new OneProducer();
        producer.sendMsg();
        System.in.read();
    }
}

三、消费端

1、消费端执行代码

package consumer;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class OneConsumer extends ShutdownableThread {
    private KafkaConsumer<Integer, String> consumer;

    //ConsumerConfig类包含properties的Key
    public OneConsumer() {
        // 两个参数:
        // 1)指定当前消费者名称
        // 2)指定消费过程是否会被中断
        super("KafkaConsumerTest", false);
        //ConsumerConfig
        Properties properties = new Properties();
        String brokers = "39.97.176.160:9092";
        // 指定kafka集群
        properties.put("bootstrap.servers", brokers);
        // 指定消费者组ID
        properties.put("group.id", "cityGroup1");
        // 开启自动提交,默认为true
        properties.put("enable.auto.commit", "true");
        // 指定自动提交的超时时限,默认5s
        properties.put("auto.commit.interval.ms", "1000");
        // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
        // 认为消费者已经挂掉。默认为10s
        properties.put("session.timeout.ms", "30000");
        // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
        properties.put("heartbeat.interval.ms", "10000");
        // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
        // earliest:指定offset为第一条offset
        // latest: 指定offset为最后一条offset
        properties.put("auto.offset.reset", "earliest");
        // 指定key与value的反序列化器
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("test"));
        // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
        // 0,表示没有消息什么也不返回
        // >0,表示当时间到后仍没有消息,则返回空
        ConsumerRecords<Integer, String> records = consumer.poll(4000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
    }
}

2、消费端执行入口

package consumer;

public class OneConsumerTest {
    public static void main(String[] args) {
        OneConsumer consumer = new OneConsumer();
        consumer.start();
    }
}

三、问题记录

Kafka原生API

listeners:配置阿里云内网ip

advertised.listeners:配置阿里云外网ip

Kafka原生API

zookeeper.connect:配置阿里云外网ip

相关标签: Kafka