欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java使用kafka发送和生产消息的示例

程序员文章站 2023-11-29 09:13:16
1. maven依赖包 org.apache.kafka...

1. maven依赖包

<dependency> 
 <groupid>org.apache.kafka</groupid> 
 <artifactid>kafka-clients</artifactid> 
 <version>0.9.0.1</version> 
</dependency> 

2. 生产者代码

package com.lnho.example.kafka;  
import org.apache.kafka.clients.producer.kafkaproducer; 
import org.apache.kafka.clients.producer.producer; 
import org.apache.kafka.clients.producer.producerrecord;   
import java.util.properties;   
public class kafkaproducerexample { 
 public static void main(string[] args) { 
  properties props = new properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("acks", "all"); 
  props.put("retries", 0); 
  props.put("batch.size", 16384); 
  props.put("linger.ms", 1); 
  props.put("buffer.memory", 33554432); 
  props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); 
  props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");   
  producer<string, string> producer = new kafkaproducer<>(props); 
  for(int i = 0; i < 100; i++) 
   producer.send(new producerrecord<>("topic1", integer.tostring(i), integer.tostring(i)));   
  producer.close(); 
 } 
} 

3. 消费者代码

package com.lnho.example.kafka;   
import org.apache.kafka.clients.consumer.consumerrecord; 
import org.apache.kafka.clients.consumer.consumerrecords; 
import org.apache.kafka.clients.consumer.kafkaconsumer; 
import java.util.arrays; 
import java.util.properties;   
public class kafkaconsumerexample { 
 public static void main(string[] args) { 
  properties props = new properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("group.id", "test"); 
  props.put("enable.auto.commit", "true"); 
  props.put("auto.commit.interval.ms", "1000"); 
  props.put("session.timeout.ms", "30000"); 
  props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); 
  kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props); 
  consumer.subscribe(arrays.aslist("topic1")); 
  while (true) { 
   consumerrecords<string, string> records = consumer.poll(100); 
   for (consumerrecord<string, string> record : records) 
    system.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); 
  } 
 } 
} 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。