欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

5 ,kafka 的 API 操作,生产者,消费者,新版,旧版

程序员文章站 2022-03-26 21:49:52
...

0 ,使用场景 :

  1. 低级 API :常用
  2. 低级 API :调错

一 ,生产者 :

1 ,打开消费者 : 等着 ( 监听 first 主题 )

node01 上 :

cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic first

2 ,java 建项目 :

pom 文件 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.heima</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- 新 API -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- 旧 API -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
        </dependency>
    </dependencies>
</project>

3 ,创建生产者( 过时的API )

  1. 代码 :
package kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProder {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "node01:9092");
        properties.put("request.required.acks", "1");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        //  创建生产者
        Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
        //  创建消息
        KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
        //  生产者发消息
        producer.send(message);
        producer.close();
    }
}
  1. 执行 :
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版
  2. 结果 : 消费者收到消息
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版

4 ,创建生产者( 新的 API )

  1. 代码 :
package kafkatest;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProder {
    public static void main(String[] args) {
        //  配置文件
        Properties props = new Properties();
        //  Kafka 服务端的主机名和端口号
        props.put("bootstrap.servers", "node01:9092");
        //  等待所有副本节点的应答
        props.put("acks", "all");
        //  消息发送最大尝试次数
        props.put("retries", 0);
        //  一批消息处理大小
        props.put("batch.size", 16384);
        //  请求延时
        props.put("linger.ms", 1);
        //  发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        //  key 序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //  value 序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //  创建生产者
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //  发消息
        for (int i = 0; i < 50; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
        }
        //  释放资源
        producer.close();
    }
}
  1. 执行 :
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版
  2. 效果 : 消费端收到消息
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版

5 ,创建生产者,带回调函数 :

  1. 目的 : 发消息成功后,调方法
  2. 代码 :
package kafkatest;

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "node01:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (metadata != null) {
                        //  分区号码,数据的偏移量
                        System.out.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

  1. 执行 :点击 idea 的绿色箭头
  2. 效果 :看控制台 ( 数据所属分区,数据在这个分内的位置偏移量 )
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版

6 ,创建生产者 :自定义分区

  1. 需求 : 将所有数据存储到topic的第0号分区上
  2. 思路 : 先定义一个分区类,再在生产者中使用这个类
  3. 自定义分区 :
package kafkatest;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {

    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分区
        return 0;
    }

    public void close() {

    }
}
  1. 生产者 :
package kafkatest;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "node01:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 自定义分区 ( 精华代码 )
        props.put("partitioner.class", "kafkatest.CustomPartitioner");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<String, String>("first", "1", "sflsfl"));
        producer.close();
    }
}
  1. 执行
  2. 观察
    cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0
    数据都到了 0 号分区

二 ,消费者 :

1 ,消费者,高级 API ( 常用 )

  1. 在控制台启动生产者 :
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
hello
  1. 旧 API
package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

	@SuppressWarnings("deprecation")
	public static void main(String[] args) {
		Properties properties = new Properties();
		
		properties.put("zookeeper.connect", "hadoop102:2181");
		properties.put("group.id", "g1");
		properties.put("zookeeper.session.timeout.ms", "500");
		properties.put("zookeeper.sync.time.ms", "250");
		properties.put("auto.commit.interval.ms", "1000");
		
		// 创建消费者连接器
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		HashMap<String, Integer> topicCount = new HashMap<>();
		topicCount.put("first", 1);
		
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
		
		KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
		
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		
		while (it.hasNext()) {
			System.out.println(new String(it.next().message()));
		}
	}
}
  1. 新 API : 自动维护消费情况
package kafkatest;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "node01:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // 消费者订阅的 topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("first"));

        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}
  1. 执行
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版
  2. 在 node01 给 kafka 发消息
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版
  3. 效果 : idea 中收到来自 kafka 的消息 :
    5 ,kafka 的 API 操作,生产者,消费者,新版,旧版

2 ,消费者,低级 API :使用场景

不常用,偶尔用,调错的时候用

3 ,消费者,低级 API :步骤

步骤 工作内容
1 根据指定的分区从主题元数据中找到主副本
2 获取分区最新的消费进度
3 从主副本拉取分区的消息
4 识别主副本的变化,重试

4 ,消费者,低级 API :方法描述

方法名 方法内容
findLeader() 客户端向种子节点发送主题元数据,将副本集加入备用节点
getLastOffset() 消费者客户端发送偏移量请求,获取分区最近的偏移量
run() 消费者低级 API 拉取消息的主要方法
findNewLeader() 当分区的主副本节点发生故障,客户将要找出新的主副本

5 ,原理 :

  1. 定义消费者对象
  2. 找到主副本:
    if(主副本正常){
    用主副本
    }else{
    重新找主副本
    }
  3. 消费主副本中的数据。

6 ,代码 :

package kafkatest;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {

    //  存储中间数用的
    private List<String> m_replicaBrokers = null;

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    //  消费数据 ( 指定主题,指定分区,指定偏移量 )
    public static void main(String args[]) {
        //  broker 节点的 ip
        List<String> seeds = new ArrayList<String>();
        seeds.add("192.168.72.141");
        seeds.add("192.168.72.142");
        seeds.add("192.168.72.143");
        //  kafka 的端口
        int port = Integer.parseInt("9092");

        //  指定主题,分区号,偏移量
        String topic = "first";
        int partition = Integer.parseInt("0");
        int offset = 0;

        //  本类的对象
        SimpleExample example = new SimpleExample();
        //  最大读取消息数量
        long maxReads = Long.parseLong("3");

        try {
            //  调取主要的方法,把我们的数据都给他
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    //  主方法
    //  参数解释 : 最大读取消息量,主题名字,分区编号,broker ids ,端口
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        //  获取我们指定的分区的元数据:获取指定 Topic partition 的元数据
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        //  主分区的 ip 地址
        String leadBroker = metadata.leader().host();
        //  客户端名字
        String clientName = "Client_" + a_topic + "_" + a_partition;
        //  低级客户端
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        //  获取偏移量:上次读到哪里了
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                //  打印消息
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }
    //  获取偏移量,上次读到哪里了
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    //  之前的一次获取主分区失败了,重新寻找主分区,重试三次
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                Thread.sleep(1000);
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    //  获取指定的分区信息
    //  参数 : broker 的 ip 集合,端口号,主题名,分区号
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        //  分区信息
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            //  低级消费者
            SimpleConsumer consumer = null;
            try {
                //  客户端 :参数意义是,broker 的 ip,超时时间,缓存区大小,客户端 id ( 自己取名字 )
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                //  主题 :可能是一堆主题
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                //  客户端发请求,得到响应
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
                //  从响应中得到主题元数据
                List<TopicMetadata> metaData = resp.topicsMetadata();
                //  这是一种可扩展性的方法,metaData 代表很多主题的元数据
                for (TopicMetadata item : metaData) {
                    //  item 代表每个主题的元数据,每个主题有很多分区,遍历这些分区
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        //  找到我们需要的分区号的分区
                        if (part.partitionId() == a_partition) {
                            //  part 就是那个分区
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        //  returnMetaData 就是我们要找的那个分区
        if (returnMetaData != null) {
            //  字符串集合,我们打算把找到的结果放进去
            m_replicaBrokers.clear();
            //  replica :分区对应的副本集合
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                //  每个副本的主机地址
                m_replicaBrokers.add(replica.host());
            }
        }
        //  返回我们找到的这个分区
        return returnMetaData;
    }
}
相关标签: kafka