5 ,kafka 的 API 操作,生产者,消费者,新版,旧版
程序员文章站
2022-03-26 21:49:52
...
0 ,使用场景 :
- 低级 API :常用
- 低级 API :调错
一 ,生产者 :
1 ,打开消费者 : 等着 ( 监听 first 主题 )
node01 上 :
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic first
2 ,java 建项目 :
pom 文件 :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heima</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- 新 API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- 旧 API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
</project>
3 ,创建生产者( 过时的API )
- 代码 :
package kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProder {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("metadata.broker.list", "node01:9092");
properties.put("request.required.acks", "1");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
// 创建生产者
Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
// 创建消息
KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
// 生产者发消息
producer.send(message);
producer.close();
}
}
- 执行 :
- 结果 : 消费者收到消息
4 ,创建生产者( 新的 API )
- 代码 :
package kafkatest;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProder {
public static void main(String[] args) {
// 配置文件
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "node01:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key 序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 发消息
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
}
// 释放资源
producer.close();
}
}
- 执行 :
- 效果 : 消费端收到消息
5 ,创建生产者,带回调函数 :
- 目的 : 发消息成功后,调方法
- 代码 :
package kafkatest;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "node01:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
// 分区号码,数据的偏移量
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
- 执行 :点击 idea 的绿色箭头
- 效果 :看控制台 ( 数据所属分区,数据在这个分内的位置偏移量 )
6 ,创建生产者 :自定义分区
- 需求 : 将所有数据存储到topic的第0号分区上
- 思路 : 先定义一个分区类,再在生产者中使用这个类
- 自定义分区 :
package kafkatest;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 控制分区
return 0;
}
public void close() {
}
}
- 生产者 :
package kafkatest;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class PartitionerProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "node01:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 自定义分区 ( 精华代码 )
props.put("partitioner.class", "kafkatest.CustomPartitioner");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("first", "1", "sflsfl"));
producer.close();
}
}
- 执行
- 观察
cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0
数据都到了 0 号分区
二 ,消费者 :
1 ,消费者,高级 API ( 常用 )
- 在控制台启动生产者 :
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
hello
- 旧 API
package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class CustomConsumer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "hadoop102:2181");
properties.put("group.id", "g1");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000");
// 创建消费者连接器
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
HashMap<String, Integer> topicCount = new HashMap<>();
topicCount.put("first", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
}
- 新 API : 自动维护消费情况
package kafkatest;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定义kakfa 服务的地址,不需要将所有broker指定上
props.put("bootstrap.servers", "node01:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "true");
// 自动确认offset的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 消费者订阅的 topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first"));
while (true) {
// 读取数据,读取超时时间为100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
- 执行
- 在 node01 给 kafka 发消息
- 效果 : idea 中收到来自 kafka 的消息 :
2 ,消费者,低级 API :使用场景
不常用,偶尔用,调错的时候用
3 ,消费者,低级 API :步骤
步骤 | 工作内容 |
---|---|
1 | 根据指定的分区从主题元数据中找到主副本 |
2 | 获取分区最新的消费进度 |
3 | 从主副本拉取分区的消息 |
4 | 识别主副本的变化,重试 |
4 ,消费者,低级 API :方法描述
方法名 | 方法内容 |
---|---|
findLeader() | 客户端向种子节点发送主题元数据,将副本集加入备用节点 |
getLastOffset() | 消费者客户端发送偏移量请求,获取分区最近的偏移量 |
run() | 消费者低级 API 拉取消息的主要方法 |
findNewLeader() | 当分区的主副本节点发生故障,客户将要找出新的主副本 |
5 ,原理 :
- 定义消费者对象
- 找到主副本:
if(主副本正常){
用主副本
}else{
重新找主副本
} - 消费主副本中的数据。
6 ,代码 :
package kafkatest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
public class SimpleExample {
// 存储中间数用的
private List<String> m_replicaBrokers = null;
public SimpleExample() {
m_replicaBrokers = new ArrayList<String>();
}
// 消费数据 ( 指定主题,指定分区,指定偏移量 )
public static void main(String args[]) {
// broker 节点的 ip
List<String> seeds = new ArrayList<String>();
seeds.add("192.168.72.141");
seeds.add("192.168.72.142");
seeds.add("192.168.72.143");
// kafka 的端口
int port = Integer.parseInt("9092");
// 指定主题,分区号,偏移量
String topic = "first";
int partition = Integer.parseInt("0");
int offset = 0;
// 本类的对象
SimpleExample example = new SimpleExample();
// 最大读取消息数量
long maxReads = Long.parseLong("3");
try {
// 调取主要的方法,把我们的数据都给他
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}
// 主方法
// 参数解释 : 最大读取消息量,主题名字,分区编号,broker ids ,端口
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// 获取我们指定的分区的元数据:获取指定 Topic partition 的元数据
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out.println("Can't find Leader for Topic and Partition. Exiting");
return;
}
// 主分区的 ip 地址
String leadBroker = metadata.leader().host();
// 客户端名字
String clientName = "Client_" + a_topic + "_" + a_partition;
// 低级客户端
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
// 获取偏移量:上次读到哪里了
long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
// 打印消息
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}
// 获取偏移量,上次读到哪里了
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
// 之前的一次获取主分区失败了,重新寻找主分区,重试三次
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
Thread.sleep(1000);
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
// 获取指定的分区信息
// 参数 : broker 的 ip 集合,端口号,主题名,分区号
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
// 分区信息
PartitionMetadata returnMetaData = null;
loop:
for (String seed : a_seedBrokers) {
// 低级消费者
SimpleConsumer consumer = null;
try {
// 客户端 :参数意义是,broker 的 ip,超时时间,缓存区大小,客户端 id ( 自己取名字 )
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
// 主题 :可能是一堆主题
TopicMetadataRequest req = new TopicMetadataRequest(topics);
// 客户端发请求,得到响应
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
// 从响应中得到主题元数据
List<TopicMetadata> metaData = resp.topicsMetadata();
// 这是一种可扩展性的方法,metaData 代表很多主题的元数据
for (TopicMetadata item : metaData) {
// item 代表每个主题的元数据,每个主题有很多分区,遍历这些分区
for (PartitionMetadata part : item.partitionsMetadata()) {
// 找到我们需要的分区号的分区
if (part.partitionId() == a_partition) {
// part 就是那个分区
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
// returnMetaData 就是我们要找的那个分区
if (returnMetaData != null) {
// 字符串集合,我们打算把找到的结果放进去
m_replicaBrokers.clear();
// replica :分区对应的副本集合
for (BrokerEndPoint replica : returnMetaData.replicas()) {
// 每个副本的主机地址
m_replicaBrokers.add(replica.host());
}
}
// 返回我们找到的这个分区
return returnMetaData;
}
}
上一篇: SQL Server 数据库 T-SQL 高级查询
下一篇: 什么样的代码称得上是好代码?