Kafka生产者(Producer) API 生产数据
程序员文章站
2022-06-14 13:39:25
...
小编对以下的四种情况进行讨论:
- 过时的API实现
- 新API实现
- 带有回调函数的API实现
- 自定义分区的API实现
所用的pom文件如下所示:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
</dependencies>
过时的API实现
package com.myStudy.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.KeyedMessage$;
import kafka.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.Producer; // 新的Kafka API
import java.util.Properties;
/**
* kafka 生产者 过时的API
*/
public class OldProducer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
// Properties() 继承自HashTable HashTable 又继承自Map 所以这里用的是Map的put来放数据
// 1设置zookeeper的地址
properties.put("metadata.broker.list", "hodoop01:9092");
// ack 等级为1 的时候,此时只是保证leader将数据写入本地的log中,无需等待所有follower写入完成
// ack 的等级有 0 1 all 三个等级 0的意思是不会等待任何的确认,数据发送到socket缓冲区中认为已经发送
// all 是等所有的leader和follower都保存好数据之后
// 2.设置kafka的ack等级
properties.put("request.require.ack", 1);
// 3.设置序列化
properties.put("serializer", "kafka.serializer.StringEncoder");
// kafka 1.x当中直接使用的是Map的方法
// 4.创建生产者
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(properties));
// 5.kafka 消息的topic 和 消息的内容设置
KeyedMessage message = new KeyedMessage<Integer, String>("first", "helloworld");
// 6.发送消息
producer.send(message);
}
}
新API实现
package com.myStudy.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* kafka 新的消费者API 用的是KafkaProducer
*/
public class NewProducer {
public static void main(String[] args) {
// 1.创建配置对象
Properties properties = new Properties();
// 2.设置配置信息
// kafka 服务端的主机名和端口号
properties.put("bootstrap.servers", "hadoop03:9092");
// 等待所有副本的应答
properties.put("acks", "all");
// 消息发送最大的尝试次数
properties.put("retries", 0);
// 一批消息的处理大小
properties.put("batch.size", 16384);
// 请求的延迟
properties.put("linger.ms", 1);
// 发送缓冲区内存大小
properties.put("buffer.size", 33554432);
// key 序列化
properties.put("key.serializer", "org.apache.kafka.common.serilization.StringSerilizer");
// value 序列化
properties.put("value.serializer", "org.apache.kafka.common.serilization.StringSerilizer");
// KafkaProducer 有多个构造方法,可以用Map来进行社会参数,也可在构造方法中进行设置序列化
KafkaProducer producer = new KafkaProducer<String, String>(properties);
// 3生产数据
for (int i = 0; i < 30; i++) {
// ProducerRecord 中还可以设置 topic partition 时间戳 header 等信息
producer.send(new ProducerRecord("first", Integer.toString(i), "helllo world-" + i));
}
// 4关闭生产者
producer.close();
}
}
带有回调函数的API实现
package com.myStudy.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop03:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 30; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello" + i));
}
// 源码中设置的是:user-supplied callback to execute when the record has been acknowledged by the server
// 设置记录被确认的回调函数
producer.send(new ProducerRecord<String, String>("first", "hello kafka"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
// 获取分区和偏移量
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
producer.close();
}
}
自定义分区的API实现
1.自定义分区类
package com.myStudy.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区生产者:
* 需求:将所有的数据存储到topic0号分区上面去
* 实现分区接口重写分区方法
*/
public class CustomPartition implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 控制分区
return 0;
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
2.使用自定义分区类
package com.myStudy.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 用自定义的分区来实现分区的控制
*/
public class CustomerPartitionProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop03:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置启用自定义分区
props.put("partitioner.class", CustomPartition.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("first", "1", "hello kafka"));
producer.close();
}
}
小编的生产者的API如上面所示
推荐阅读
-
Python 使用python-kafka类库开发kafka生产者&消费者&客户端
-
kafka的生产者和消费者代码示例
-
springboot配置kafka生产者和消费者详解
-
大数据时代的生产者迷思:*怎么了?
-
从源码分析如何优雅的使用 Kafka 生产者
-
Kafka生产者案例报警告SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
-
kafka命令行生产者消费者测试
-
Kafka 生产者消费者模式 写入/读取数据 [ 命令行/Java 代码 ]
-
kafka producer 生产者客户端参数配置
-
Kafka学习(一)生产者producer(个人规范用法)