欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka producer 生产者API操作

程序员文章站 2022-06-14 13:38:25
...

一、准备工作

  • 在IDE上创建maven项目,pom文件添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>
  • 启动zookeeper集群
bin/zkServer.sh start
  • 启动kafka集群
bin/kafka-server-start.sh -daemon config/server.properties
  • kafka集群开一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata 
--from-beginning

二、创建一个普通的生产者

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        /* 相当于map操作 */
        Properties properties = new Properties();

        /* kafka 集群,broker-list */
        properties.put("bootstrap.servers", "centos7-1:9092");

        /* 等待所有副本节点的应答 */
        properties.put("acks", "all");

        /* 重试次数 */
        properties.put("retries", Integer.valueOf(3));

        /* 批次大小 */
        properties.put("batch.size", Integer.valueOf(16384));

        /* 等待时间 */
        properties.put("linger.ms", Integer.valueOf(1));

        /* RecordAccumulator 缓冲区大小 */
        properties.put("buffer.memory", Integer.valueOf(33554432));

        /* key序列化 */
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /* value序列化 */
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /* 创建生产者对象 */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);

        /* 发送数据 */
        for (int i = 0; i < 10; i++) {
            /* 发送的主题、key、value */
            kafkaProducer.send(new ProducerRecord("bigdata", "jh","jh==" + i));
        }

        /* 关闭资源 */
        kafkaProducer.close();
    }
}

效果展示,集群开启的消费者:
kafka producer 生产者API操作

三、创建带回调函数的生产者

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import java.util.Properties;
import org.apache.kafka.clients.producer.*;

public class CallBackProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        /* kafka 集群 broker-list */
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos7-1:9092");

        /* 等待所有副本节点的应答 */
        properties.put("acks", "-1");

        /* 重试次数 */
        properties.put("retries", Integer.valueOf(3));

        /* 批次大小 */
        properties.put("batch.size", Integer.valueOf(16384));

        /* 等待时间 */
        properties.put("linger.ms", Integer.valueOf(1));

        /* RecordAccumulator 缓冲区大小 */
        properties.put("buffer.memory", Integer.valueOf(33554432));

        /* key序列化 */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        /* value序列化 */
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        /* 创建生产者对象 */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);

        /* 发送数据 */
        for (int i = 0; i < 100000; i++) {
            /* 发送的主题、key、value 回调函数*/
            kafkaProducer.send(new ProducerRecord("backtest", "jh","jh===>" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    /* 如果发送成功,输出分区、offset */
                    if (exception == null) {
                        System.out.println(metadata.partition() + "--" + metadata.offset());
                    } else {
                        /* 如果发送失败,打印报错信息 */
                        exception.printStackTrace();
                    }
                }
            });
        }
        /* 关闭资源 */
        kafkaProducer.close();
    }
}

send方法源码:可以指定分区存储

  @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

public class ProducerRecord<K, V> {

    private final String topic;
    /* 这里可以指定分区 */
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

运行完的部分,默认是hash分区的,并且可以看出是区内有序的

0--188
0--189
0--190
0--191
0--192
0--193
2--2467
2--2468
2--2469
2--2470
2--2471
2--2472

四、创建自定义分区的生产者

创建一个类实现Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

public class Mypartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /* 这里写自己对应的业务逻辑 */
//        List<PartitionInfo> mydata = cluster.availablePartitionsForTopic("mydata");
//        Integer mydata1 = cluster.partitionCountForTopic("mydata");

        return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {

    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducerPartition {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        /* 相当于map操作 */
        Properties properties = new Properties();

        /* kafka 集群,broker-list */
        properties.put("bootstrap.servers", "centos7-1:9092");

        /* 等待所有副本节点的应答 */
        properties.put("acks", "all");

        /* 重试次数 */
        properties.put("retries", Integer.valueOf(3));

        /* 批次大小 */
        properties.put("batch.size", Integer.valueOf(16384));

        /* 等待时间 */
        properties.put("linger.ms", Integer.valueOf(1));

        /* RecordAccumulator 缓冲区大小 */
        properties.put("buffer.memory", Integer.valueOf(33554432));

        /* key序列化 */
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /* value序列化 */
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /* 添加自定义分区 */
        properties.put("partitioner.class", "com.gu.Mypartition");

        /* 创建生产者对象 */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);

        /* 发送数据 */
        for (int i = 0; i < 10; i++) {
            /* 发送的主题、key、value */
            kafkaProducer.send(new ProducerRecord("bigdata", "jh==" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    /* 如果发送成功,输出分区、offset */
                    if (exception == null) {
                        System.out.println(metadata.partition() + "--" + metadata.offset());
                    } else {
                        /* 如果发送失败,打印报错信息 */
                        exception.printStackTrace();
                    }
                }
            });
        }

        /* 关闭资源 */
        kafkaProducer.close();
    }
}
相关标签: kafka kafka java