商品详情页系统架构-笔记8 - 详情页缓存架构设计
目录
3、基于kafka+ehcache_redis完成缓存数据生产服务的开发
1、阶段总结
时效性要求很高的数据
库存,采取的是数据库+缓存双写的技术方案,也解决了双写的一致性的问题
时效性不高的数据
比如一些商品的基本信息,如果发生了变更,假设在5分钟之后再更新到页面中,供用户观察到,也是ok的
时效性要求不高的数据,那么我们采取的是异步更新缓存的策略
【缓存数据生产服务】
监听一个消息队列,然后数据源服务(商品信息管理服务)发生了数据变更之后,就将数据变更的消息推送到消息队列中
缓存数据生产服务可以去消费到这个数据变更的消息,然后根据消息的指示提取一些参数,然后调用对应的数据源服务的接口,拉去数据,这个时候一般是从mysql库中拉去的
2、zk、kafka集群安装
参考本人博客:https://blog.csdn.net/allensandy/article/details/89456977
3、基于kafka+ehcache_redis完成缓存数据生产服务的开发
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1</version>
</dependency>
监听器
系统初始化的监听器
/**
* 系统初始化的监听器
* @author Administrator
*
*/
public class InitListener implements ServletContextListener {
public void contextInitialized(ServletContextEvent sce) {
ServletContext sc = sce.getServletContext();
ApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(sc);
SpringContext.setApplicationContext(context);
new Thread(new KafkaConsumer("cache-message")).start();
}
public void contextDestroyed(ServletContextEvent sce) {
}
}
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean =
new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
kafkaConsumer
public class KafkaConcusmer implements Runnable {
private final ConsumerConnector consumer;
private final String topic;
public ConsumerGroupExample(String topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181");
props.put("group.id", "eshop-cache-group");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream stream : streams) {
new Thread(new KafkaMessageProcessor(stream)).start();
}
}
}
kafkaMessageProcesor kafka消息处理线程
public class KafkaMessageProcessor implements Runnable {
private KafkaStream kafkaStream;
public ConsumerTest(KafkaStream kafkaStream) {
this.kafkaStream = kafkaStream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
}
}
}
4、业务逻辑
编写业务逻辑
(1)两种服务【商品信息服务,商品店铺信息服务】会发送来数据变更消息:商品信息服务,商品店铺信息服务,每个消息都包含服务名以及商品id
(2)接收到消息之后,根据商品id到对应的服务拉取数据,这一步,我们采取简化的模拟方式,就是在代码里面写死,会获取到什么数据,不去实际再写其他的服务去调用了
(3)商品信息:id,名称,价格,图片列表,商品规格,售后信息,颜色,尺寸
(4)商品店铺信息:其他维度,用这个维度模拟出来缓存数据维度化拆分,id,店铺名称,店铺等级,店铺好评率
(5)分别拉取到了数据之后,将数据组织成json串,然后分别存储到ehcache中,和redis缓存中
测试业务逻辑
(1)创建一个kafka topic
(2)在命令行启动一个kafka producer
(3)启动系统,消费者开始监听kafka topic
C:\Windows\System32\drivers\etc\hosts
(4)在producer中,分别发送两条消息,一个是商品信息服务的消息,一个是商品店铺信息服务的消息
(5)能否接收到两条消息,并模拟拉取到两条数据,同时将数据写入ehcache中,并写入redis缓存中
(6)ehcache通过打印日志方式来观察,redis通过手工连接上去来查询
业务代码 略。
本文地址:https://blog.csdn.net/allensandy/article/details/109624531
上一篇: Spring Cloud Alibaba#23.微服务为什么要引入网关
下一篇: 简单集群架构