欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Kafka 2.3 Producer (0.9以后版本适用)

程序员文章站 2022-04-20 18:06:58
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。 这里直接使用最新2.3版本,0.9以后的版本都适用。 注意引用的包为:org.apache.kafka.clients.producer 0.11.0以后增加了事务,事务producer的示例代码如下,需 ......

kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

这里直接使用最新2.3版本,0.9以后的版本都适用。

注意引用的包为:org.apache.kafka.clients.producer

import java.util.properties;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;

public class producerdemo {

    public static void main(string[] args) {

        properties properties = new properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
        kafkaproducer<string, string> kafkaproducer = new kafkaproducer<string, string>(properties);
        kafkaproducer.send(new producerrecord<>("topic", "value"));
        kafkaproducer.close();

    }
    
}

0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.common.kafkaexception;
import org.apache.kafka.common.errors.authorizationexception;
import org.apache.kafka.common.errors.outofordersequenceexception;
import org.apache.kafka.common.errors.producerfencedexception;
import org.apache.kafka.common.serialization.stringserializer;

import java.util.properties;

public class transactionsproducerdemo {

    public static void main(string[] args) {

        properties props = new properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        producer<string, string> producer = new kafkaproducer<>(props, new stringserializer(), new stringserializer());

        producer.inittransactions();

        try {
            producer.begintransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new producerrecord<>("my-topic", integer.tostring(i), integer.tostring(i)));
            producer.committransaction();
        } catch (producerfencedexception | outofordersequenceexception | authorizationexception e) {
            // we can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (kafkaexception e) {
            // for all other exceptions, just abort the transaction and try again.
            producer.aborttransaction();
        }
        producer.close();

    }
    
}

更多实时计算,kafka等相关技术博文,欢迎关注实时流式计算

Kafka 2.3 Producer (0.9以后版本适用)