kafka producer 生产者API操作
程序员文章站
2022-06-14 13:38:25
...
一、准备工作
- 在IDE上创建maven项目,pom文件添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
- 启动zookeeper集群
bin/zkServer.sh start
- 启动kafka集群
bin/kafka-server-start.sh -daemon config/server.properties
- kafka集群开一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata
--from-beginning
二、创建一个普通的生产者
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MyProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
/* 相当于map操作 */
Properties properties = new Properties();
/* kafka 集群,broker-list */
properties.put("bootstrap.servers", "centos7-1:9092");
/* 等待所有副本节点的应答 */
properties.put("acks", "all");
/* 重试次数 */
properties.put("retries", Integer.valueOf(3));
/* 批次大小 */
properties.put("batch.size", Integer.valueOf(16384));
/* 等待时间 */
properties.put("linger.ms", Integer.valueOf(1));
/* RecordAccumulator 缓冲区大小 */
properties.put("buffer.memory", Integer.valueOf(33554432));
/* key序列化 */
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/* value序列化 */
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/* 创建生产者对象 */
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
/* 发送数据 */
for (int i = 0; i < 10; i++) {
/* 发送的主题、key、value */
kafkaProducer.send(new ProducerRecord("bigdata", "jh","jh==" + i));
}
/* 关闭资源 */
kafkaProducer.close();
}
}
效果展示,集群开启的消费者:
三、创建带回调函数的生产者
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
public class CallBackProducer {
public static void main(String[] args) {
Properties properties = new Properties();
/* kafka 集群 broker-list */
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos7-1:9092");
/* 等待所有副本节点的应答 */
properties.put("acks", "-1");
/* 重试次数 */
properties.put("retries", Integer.valueOf(3));
/* 批次大小 */
properties.put("batch.size", Integer.valueOf(16384));
/* 等待时间 */
properties.put("linger.ms", Integer.valueOf(1));
/* RecordAccumulator 缓冲区大小 */
properties.put("buffer.memory", Integer.valueOf(33554432));
/* key序列化 */
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/* value序列化 */
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/* 创建生产者对象 */
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
/* 发送数据 */
for (int i = 0; i < 100000; i++) {
/* 发送的主题、key、value 回调函数*/
kafkaProducer.send(new ProducerRecord("backtest", "jh","jh===>" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
/* 如果发送成功,输出分区、offset */
if (exception == null) {
System.out.println(metadata.partition() + "--" + metadata.offset());
} else {
/* 如果发送失败,打印报错信息 */
exception.printStackTrace();
}
}
});
}
/* 关闭资源 */
kafkaProducer.close();
}
}
send方法源码:可以指定分区存储
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
public class ProducerRecord<K, V> {
private final String topic;
/* 这里可以指定分区 */
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
运行完的部分,默认是hash分区的,并且可以看出是区内有序的
0--188
0--189
0--190
0--191
0--192
0--193
2--2467
2--2468
2--2469
2--2470
2--2471
2--2472
四、创建自定义分区的生产者
创建一个类实现Partitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class Mypartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/* 这里写自己对应的业务逻辑 */
// List<PartitionInfo> mydata = cluster.availablePartitionsForTopic("mydata");
// Integer mydata1 = cluster.partitionCountForTopic("mydata");
return 0;
}
@Override
public void close() {
}
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducerPartition {
public static void main(String[] args) throws InterruptedException, ExecutionException {
/* 相当于map操作 */
Properties properties = new Properties();
/* kafka 集群,broker-list */
properties.put("bootstrap.servers", "centos7-1:9092");
/* 等待所有副本节点的应答 */
properties.put("acks", "all");
/* 重试次数 */
properties.put("retries", Integer.valueOf(3));
/* 批次大小 */
properties.put("batch.size", Integer.valueOf(16384));
/* 等待时间 */
properties.put("linger.ms", Integer.valueOf(1));
/* RecordAccumulator 缓冲区大小 */
properties.put("buffer.memory", Integer.valueOf(33554432));
/* key序列化 */
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/* value序列化 */
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/* 添加自定义分区 */
properties.put("partitioner.class", "com.gu.Mypartition");
/* 创建生产者对象 */
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
/* 发送数据 */
for (int i = 0; i < 10; i++) {
/* 发送的主题、key、value */
kafkaProducer.send(new ProducerRecord("bigdata", "jh==" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
/* 如果发送成功,输出分区、offset */
if (exception == null) {
System.out.println(metadata.partition() + "--" + metadata.offset());
} else {
/* 如果发送失败,打印报错信息 */
exception.printStackTrace();
}
}
});
}
/* 关闭资源 */
kafkaProducer.close();
}
}