Kafka 生产者消费者模式 写入/读取数据 [ 命令行/Java 代码 ]
一.确认配置文件:
打开config/server.properties 文件,修改broker.id,listeners,port,log.dirs
vi config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.105.110:9092
port=9092
log.dirs=kafka-logs
zookeeper.connect=192.168.105.110:2181
备注:
listeners一定要配置成为IP地址;
如果配置为localhost或服务器的hostname,在使用java获取数据时会拿不到数据,或者发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。
因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
二.命令行创建生产者消费者
1.创建主题:
bin/kafka-topics.sh --create --zookeeper 192.168.105.110:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
2.查看主题列表
bin/kafka-topics.sh --list --zookeeper 192.168.105.110:2181
3. 启动生产者以发送消息
bin/kafka-console-producer.sh --broker-list 192.168.105.110: 9092 --topic Hello-Kafka
4. 启动消费者,消费数据 (两种方式均可)
bin/kafka-console-consumer.sh --zookeeper 192.168.105.110:2181 --topic Hello-Kafka --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 192.168.105.110:9092 --topic Hello-Kafka --from-beginning
三. java 代码
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
/**
* @author admin
* @title SimpleProducer
* @projectName KafkaStreamDemo
* @description TODO
* @date 2019/9/1016:45
*/
public class SimpleProducer {
public static void main(String[] args) throws Exception {
// Assign topicName to string variable
String topicName = "Hello-Kafka";
// create instance for properties to access producer configs
Properties props = new Properties();
// Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
props.put("bootstrap.servers", "192.168.105.110:9092");
// Set acknowledgements for producer requests.
props.put("acks", "all");
// If the request fails, the producer can automatically retry,
props.put("retries", 0);
// Specify buffer size in config
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0 ;
while (i < 100) {
String tempString = UUID.randomUUID().toString();
System.out.println("----------"+tempString);
producer.send(new ProducerRecord<String, String>(topicName, tempString));
Thread.sleep(1000);
i++ ;
}
System.out.println("Message sent successfully");
producer.close();
}
}
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
* @author admin
* @title SimpleConsumer
* @projectName KafkaStreamDemo
* @description TODO
* @date 2019/9/1016:47
*/
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
// Kafka consumer configuration settings
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.105.110:9092");
props.put("group.id", "CountryCounter");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
// Kafka Consumer subscribes list of topics here.
kafkaConsumer.subscribe(Collections.singletonList(topicName) );
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
上一篇: 生产者和消费者