欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka生产者(Producer) API 生产数据

程序员文章站 2022-06-14 13:39:25
...

小编对以下的四种情况进行讨论:

  1. 过时的API实现
  2. 新API实现
  3. 带有回调函数的API实现
  4. 自定义分区的API实现

所用的pom文件如下所示:

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>
</dependencies>

过时的API实现

package com.myStudy.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.KeyedMessage$;
import kafka.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.Producer; // 新的Kafka API
import java.util.Properties;

/**
 * kafka 生产者 过时的API
 */
public class OldProducer {
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Properties properties = new Properties();

        // Properties() 继承自HashTable  HashTable 又继承自Map 所以这里用的是Map的put来放数据
        // 1设置zookeeper的地址
        properties.put("metadata.broker.list", "hodoop01:9092");

        // ack 等级为1 的时候,此时只是保证leader将数据写入本地的log中,无需等待所有follower写入完成
        // ack 的等级有 0 1 all 三个等级 0的意思是不会等待任何的确认,数据发送到socket缓冲区中认为已经发送
        // all 是等所有的leader和follower都保存好数据之后

        // 2.设置kafka的ack等级
        properties.put("request.require.ack", 1);
        // 3.设置序列化
        properties.put("serializer", "kafka.serializer.StringEncoder");

        // kafka 1.x当中直接使用的是Map的方法
        // 4.创建生产者
        Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(properties));

        // 5.kafka 消息的topic 和 消息的内容设置
        KeyedMessage message = new KeyedMessage<Integer, String>("first", "helloworld");

        // 6.发送消息
        producer.send(message);
    }
}

新API实现

package com.myStudy.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * kafka 新的消费者API 用的是KafkaProducer
 */
public class NewProducer {
    public static void main(String[] args) {
        // 1.创建配置对象
        Properties properties = new Properties();

        // 2.设置配置信息
        // kafka 服务端的主机名和端口号
        properties.put("bootstrap.servers", "hadoop03:9092");
        // 等待所有副本的应答
        properties.put("acks", "all");
        // 消息发送最大的尝试次数
        properties.put("retries", 0);
        // 一批消息的处理大小
        properties.put("batch.size", 16384);
        // 请求的延迟
        properties.put("linger.ms", 1);
        // 发送缓冲区内存大小
        properties.put("buffer.size", 33554432);
        // key 序列化
        properties.put("key.serializer", "org.apache.kafka.common.serilization.StringSerilizer");
        // value 序列化
        properties.put("value.serializer", "org.apache.kafka.common.serilization.StringSerilizer");

        // KafkaProducer 有多个构造方法,可以用Map来进行社会参数,也可在构造方法中进行设置序列化
        KafkaProducer producer = new KafkaProducer<String, String>(properties);

        // 3生产数据
        for (int i = 0; i < 30; i++) {
            // ProducerRecord 中还可以设置 topic partition 时间戳 header 等信息
            producer.send(new ProducerRecord("first", Integer.toString(i), "helllo world-" + i));
        }

        // 4关闭生产者
        producer.close();


    }
}

带有回调函数的API实现

package com.myStudy.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class CallBackProducer {
    public static void main(String[] args) {
        Properties props = new Properties();

        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop03:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 30; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello" + i));
        }

        // 源码中设置的是:user-supplied callback to execute when the record has been acknowledged by the server
        // 设置记录被确认的回调函数
        producer.send(new ProducerRecord<String, String>("first", "hello kafka"), new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (metadata != null) {
                    // 获取分区和偏移量
                    System.out.println(metadata.partition() + "---" + metadata.offset());
                }
            }
        });
        producer.close();
    }
}

自定义分区的API实现

1.自定义分区类

package com.myStudy.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义分区生产者:
 * 需求:将所有的数据存储到topic0号分区上面去
 * 实现分区接口重写分区方法
 */
public class CustomPartition implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分区
        return 0;
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
}

2.使用自定义分区类

package com.myStudy.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 用自定义的分区来实现分区的控制
 */
public class CustomerPartitionProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop03:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 设置启用自定义分区
        props.put("partitioner.class", CustomPartition.class);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("first", "1", "hello kafka"));

        producer.close();
    }
}

小编的生产者的API如上面所示