Kafka 2.3 Producer (0.9以后版本适用)
程序员文章站
2022-09-07 12:11:18
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。 这里直接使用最新2.3版本,0.9以后的版本都适用。 注意引用的包为:org.apache.kafka.clients.producer 0.11.0以后增加了事务,事务producer的示例代码如下,需 ......
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。
这里直接使用最新2.3版本,0.9以后的版本都适用。
注意引用的包为:org.apache.kafka.clients.producer
import java.util.properties; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; public class producerdemo { public static void main(string[] args) { properties properties = new properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); kafkaproducer<string, string> kafkaproducer = new kafkaproducer<string, string>(properties); kafkaproducer.send(new producerrecord<>("topic", "value")); kafkaproducer.close(); } }
0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:
import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.common.kafkaexception; import org.apache.kafka.common.errors.authorizationexception; import org.apache.kafka.common.errors.outofordersequenceexception; import org.apache.kafka.common.errors.producerfencedexception; import org.apache.kafka.common.serialization.stringserializer; import java.util.properties; public class transactionsproducerdemo { public static void main(string[] args) { properties props = new properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); producer<string, string> producer = new kafkaproducer<>(props, new stringserializer(), new stringserializer()); producer.inittransactions(); try { producer.begintransaction(); for (int i = 0; i < 100; i++) producer.send(new producerrecord<>("my-topic", integer.tostring(i), integer.tostring(i))); producer.committransaction(); } catch (producerfencedexception | outofordersequenceexception | authorizationexception e) { // we can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (kafkaexception e) { // for all other exceptions, just abort the transaction and try again. producer.aborttransaction(); } producer.close(); } }
更多实时计算,kafka等相关技术博文,欢迎关注实时流式计算