欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka 生产者消费者模式 写入/读取数据 [ 命令行/Java 代码 ]

程序员文章站 2022-07-12 17:38:08
...

 

一.确认配置文件:


打开config/server.properties 文件,修改broker.id,listeners,port,log.dirs

vi config/server.properties 

broker.id=0
listeners=PLAINTEXT://192.168.105.110:9092
port=9092
log.dirs=kafka-logs
zookeeper.connect=192.168.105.110:2181

备注:

listeners一定要配置成为IP地址;

如果配置为localhost或服务器的hostname,在使用java获取数据时会拿不到数据,或者发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。

因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
 

 

二.命令行创建生产者消费者



1.创建主题:


bin/kafka-topics.sh --create --zookeeper 192.168.105.110:2181 --replication-factor 1   --partitions 1 --topic Hello-Kafka


2.查看主题列表


bin/kafka-topics.sh --list --zookeeper 192.168.105.110:2181


3. 启动生产者以发送消息

 

bin/kafka-console-producer.sh --broker-list 192.168.105.110: 9092 --topic Hello-Kafka

 

4. 启动消费者,消费数据 (两种方式均可)


bin/kafka-console-consumer.sh --zookeeper 192.168.105.110:2181 --topic Hello-Kafka --from-beginning


bin/kafka-console-consumer.sh --bootstrap-server 192.168.105.110:9092  --topic  Hello-Kafka --from-beginning


 

 

三. java 代码

 

生产者


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

/**
 * @author admin
 * @title SimpleProducer
 * @projectName KafkaStreamDemo
 * @description TODO
 * @date 2019/9/1016:45
 */
public class SimpleProducer {
    public static void main(String[] args) throws Exception {

        // Assign topicName to string variable
        String topicName = "Hello-Kafka";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        // Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
        props.put("bootstrap.servers", "192.168.105.110:9092");

        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);

        // Specify buffer size in config
        props.put("batch.size", 16384);

        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);


        int i = 0 ;
        while (i < 100) {
            String tempString = UUID.randomUUID().toString();
            System.out.println("----------"+tempString);
            producer.send(new ProducerRecord<String, String>(topicName, tempString));
            Thread.sleep(1000);
            i++ ;
        }

        System.out.println("Message sent successfully");
        producer.close();
    }
}

 

 

 

消费者

 



import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @author admin
 * @title SimpleConsumer
 * @projectName KafkaStreamDemo
 * @description TODO
 * @date 2019/9/1016:47
 */
public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        // Kafka consumer configuration settings
        String topicName = "Hello-Kafka";


        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.105.110:9092");


        props.put("group.id", "CountryCounter");

        props.put("auto.offset.reset", "latest");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        // Kafka Consumer subscribes list of topics here.
        kafkaConsumer.subscribe(Collections.singletonList(topicName) );

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}