Kafka原生API
程序员文章站
2022-03-26 21:38:54
...
一、依赖
<dependencies>
<!-- kafka依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
二、生产端
1、生产端执行代码
package producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class OneProducer {
// 第一个泛型:当前生产者所生产消息的key
// 第二个泛型:当前生产者所生产的消息本身
private KafkaProducer<Integer, String> producer;
//ProducerConfig类包含properties中的key
public OneProducer() {
Properties properties = new Properties();
// 指定kafka集群
properties.put("bootstrap.servers", "39.97.176.160:9092");
// 指定key与value的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定生产者每10条向broker发送一次
//properties.put("batch.size", 10);
// 指定生产者每50ms向broker发送一次
//properties.put("linger.ms", 50);
this.producer = new KafkaProducer<Integer, String>(properties);
}
public void sendMsg() {
// 创建消息记录(包含主题、消息本身) (String topic, V value)
//ProducerRecord<Integer, String> record = new ProducerRecord<>("test", "tianjin");
// 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value)
// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
// 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value)
ProducerRecord<Integer, String> record = new ProducerRecord<>("test","huNan");
producer.send(record);
}
//有结果回调
public void sendMsgTwo() {
ProducerRecord<Integer, String> record = new ProducerRecord<>("test", "shengzhen");
producer.send(record, (metadata, ex) -> {
System.out.println("topic = " + metadata.topic());
System.out.println("partition = " + metadata.partition());
System.out.println("offset = " + metadata.offset());
});
}
}
2、生产端执行入口
package producer;
import java.io.IOException;
public class OneProducerTest {
public static void main(String[] args) throws IOException {
OneProducer producer = new OneProducer();
producer.sendMsg();
System.in.read();
}
}
三、消费端
1、消费端执行代码
package consumer;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class OneConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
//ConsumerConfig类包含properties的Key
public OneConsumer() {
// 两个参数:
// 1)指定当前消费者名称
// 2)指定消费过程是否会被中断
super("KafkaConsumerTest", false);
//ConsumerConfig
Properties properties = new Properties();
String brokers = "39.97.176.160:9092";
// 指定kafka集群
properties.put("bootstrap.servers", brokers);
// 指定消费者组ID
properties.put("group.id", "cityGroup1");
// 开启自动提交,默认为true
properties.put("enable.auto.commit", "true");
// 指定自动提交的超时时限,默认5s
properties.put("auto.commit.interval.ms", "1000");
// 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
// 认为消费者已经挂掉。默认为10s
properties.put("session.timeout.ms", "30000");
// 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
properties.put("heartbeat.interval.ms", "10000");
// 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
// earliest:指定offset为第一条offset
// latest: 指定offset为最后一条offset
properties.put("auto.offset.reset", "earliest");
// 指定key与value的反序列化器
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
// 订阅消费主题
consumer.subscribe(Collections.singletonList("test"));
// 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
// 0,表示没有消息什么也不返回
// >0,表示当时间到后仍没有消息,则返回空
ConsumerRecords<Integer, String> records = consumer.poll(4000);
for(ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
}
}
}
2、消费端执行入口
package consumer;
public class OneConsumerTest {
public static void main(String[] args) {
OneConsumer consumer = new OneConsumer();
consumer.start();
}
}
三、问题记录
listeners:配置阿里云内网ip
advertised.listeners:配置阿里云外网ip
zookeeper.connect:配置阿里云外网ip
推荐阅读
-
微信小程序map组件结合高德地图API实现wx.chooseLocation功能示例
-
原生JS实现逼真的图片3D旋转效果详解
-
使用android隐藏api实现亮度调节的方法
-
Activex在没有电子秤api的情况下获取串口数据
-
神经网络API、Kotlin支持,那些你必须知道的Android 8.1预览版和Android Studio 3.0新特性
-
百度地图API应用之获取用户的具体位置
-
python使用在线API查询IP对应的地理位置信息实例
-
Kafka 常用命令行详细介绍及整理
-
循序渐进学.Net Core Web Api开发系列【4】:前端访问WebApi
-
【从零开始搭建自己的.NET Core Api框架】(三)集成轻量级ORM——SqlSugar:3.3 自动生成实体类