Kafka入门使用
1. 阻塞队列
-
BlockingQueue(接口)
解决线程通信的问题;
阻塞方法:put、take。
-
生产者消费者模式
生产者:产生数据的线程。
消费者:使用数据的线程。
-
实现类
ArrayBlockingQueue;
LinkedBlockingQueue;
PriorityBlockingQueue、SynchronousQueue、DelayQueue等。
-
代码实现
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTests { public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue(10); new Thread(new Producer(queue)).start(); new Thread(new Consumer(queue)).start(); new Thread(new Consumer(queue)).start(); new Thread(new Consumer(queue)).start(); } } class Producer implements Runnable { private BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { for(int i = 0; i < 100; i++){ Thread.sleep(20); queue.put(i); System.out.println(Thread.currentThread().getName() + "生产:" + queue.size() ); } } catch (Exception e) { e.printStackTrace(); } } } class Consumer implements Runnable { private BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Thread.sleep(new Random().nextInt(1000)); queue.take(); System.out.println(Thread.currentThread().getName() + "消费:" + queue.size()); } } catch (Exception e) { e.printStackTrace(); } } }
2. Kafka入门
-
Kafka简介
Kafka是一个分布式的流媒体平台。
应用:消息系统、日志收集、用户行为追踪、流式处理。
-
Kafka特点
高吞吐量、消息持久化、高可靠性、高扩展性。
-
Kafka术语
Broker、Zookeeper
Topic、Partition、Offset
Leader Replica、Follower Replica
-
下载并配置Kafka
打开Kafka下载目录下的config的zookeeper.properties,修改里面的dataDir为你想要的位置。
再打开同为Kafka下载目录下的config的server.properties,修改里面的log,dir(日志存放位置)为你想要的位置。
-
Windows系统下启动Kafka
先启动zookeeper,打开命令行,cd到kafka的下载目录,
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
再打开一个命令行,cd到kafka的下载目录,
bin\windows\kafka-server-start.bat config\server.properties
再次打开一个命令行,cd到kafka的下载目录里的bin目录下的windows目录下,
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
即可创建一个名为test的主题,kafka-topics.bat --list --bootstrap-server localhost:9092
即可查看所有主题。利用生产者发消息:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
,回车之后即可发消息。再次启动一个命令行窗口来查看消费者读消息,依旧cd到kafka的下载目录里的bin目录下的windows目录下,
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic tset --from-beginning
,回车之后即可看到生产者发的消息。
3. Spring整合Kafka
-
引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置Kafka
配置server、consumer。
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 # 消费者的分组id spring.kafka.consumer.group-id=community-consumer-group # 是否自动提交 spring.kafka.consumer.enable-auto-commit=true # 自动提交的频率 spring.kafka.consumer.auto-commit-interval=3000
-
访问Kafka
生产者:
kafkaTemplate.send(topic,data);
消费者:
@KafkaListener(topics = {"test}) public void handleMessage(ConsumerRecord record){}
-
代码实现
public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka() { kafkaProducer.sendMessage("test","你好"); kafkaProducer.sendMessage("test","在吗"); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } } @Component class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic,String content) { kafkaTemplate.send(topic,content); } } @Component class KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); } }
本文地址:https://blog.csdn.net/GoodBoyKaiii/article/details/110292614
上一篇: 搭建Eureka,实现服务注册与发现
下一篇: java开发入门基本概念