spring boot redis->线程池->消息队列->线程池
spring boot redis->线程池->消息队列->线程池
好久没写博客,看起来这个习惯还是很难轻易养成啊!
为了适配我在网上找的一个前端项目的数据格式,我给我原本的model 类Post的属性String imgUrl改为了List imgUrls,按照常规思路,肯定就是新建一个表来存储imgUrlds,不过这很不优雅不是吗?想我当初为什么弃更强大的xml不用,偏要用注解来select数据,不就是因为注解更优雅吗?可是我弄了这个,不是反倒更不优雅。
所以怎么办呢?我想到了redis。大家都知道redis是一个Key-Value形式的nosql数据库,其中redis为Value提供多种数据类型,其中一个就是数组,所以我为什么不用redis来存储这个imgUrls呢?
redis 存储读取imgUrls list
说干就干
首先常规准备工作
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
yml配置
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
timeout: 3000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
然后redisTemplate push,select 搞定,一切顺利
@Test
public void testRedisIns(){
List<String> postIds= Arrays.asList("235", "456", "908","123");
// List<String> postIds= Arrays.asList("123");
List<String> imgUrls=Arrays.asList("https://images.pexels.com/photos/2584055/pexels-photo-2584055.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500",
"https://images.pexels.com/photos/998904/pexels-photo-998904.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500",
"https://images.pexels.com/photos/2793453/pexels-photo-2793453.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500",
"https://images.pexels.com/photos/4099385/pexels-photo-4099385.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500",
"https://images.pexels.com/photos/3755553/pexels-photo-3755553.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500");
for(String postId:
postIds){
Integer randomCount=(int)(1+Math.random()*3);
log.info("random count ");
log.info(String.valueOf(randomCount));
for (int i = 0; i < randomCount; i++) {
// 往 List 右侧插入一个元素
redisTemplate.opsForList().rightPush(postId,imgUrls.get((int)(Math.random()*imgUrls.size())));
}
}
}
List<String> findImgUrlsByItemId(Long itemId){
// 根据range从list取元素
List<String> urls=redisTemplate.opsForList().range(String.valueOf(itemId),0,-1);
return urls;
}
可是这样还没有完,我的小脑袋瓜子又想到,既然从MySQL和redis提取数据都是不相互依赖的,那么为什么我不弄一个多线程来稍微提提速呢?
来个线程池提提速
说干就干,首先从线程池配置开始吧!
import com.example.business_server.utils.Constant;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
// 开启异步调用
public class AsyncConfig {
@Bean("dbExcutor")
public Executor dbExcutor(){
ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(Constant.INIT_THREAD_NUM);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(Constant.MAX_THREAD_NUM);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(Constant.QUEUE_CAPACITY);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(Constant.ALIVED_SECONDS);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix (Constant.DB_EXCUTOR_PREFIX);
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
开启异步的方式也很简单,只要在相应方法上加个@Async就可以了
@Async
List<String> findImgUrlsByItemId(Long itemId){
List<String> urls=redisTemplate.opsForList().range(String.valueOf(itemId),0,-1);
return urls;
}
然而好景不长,我突然发现我要想获得异步方法的返回值的话,必须要给想返回的数据类型再套一个CompletableFuture,就像这样
@Async
CompletableFuture<List<String>> findImgUrlsByItemId(Long itemId){
List<String> urls=redisTemplate.opsForList().range(String.valueOf(itemId),0,-1);
return CompletableFuture.completedFuture(urls);
}
可是redis这边还好说,mysql那边我是使用mybatis直接在抽象方法上加注解的,我怎么能改得了返回类型呢?可是我怎么会是这么容易放弃的人呢,我突然想到了我之前偶然之间听说过的消息队列好像也有类似的功能。
试试消息队列
说干就干!
首先配置erlang环境,下载rabbitMQ
然后加依赖,配yml
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 这是默认用户名和密码
username: guest
password: guest
接着也写一个配置类,反正现在只在这一个地方用,所以也不需要什么fanout,topic,直接上最普通的queue
@Configuration
public class RabbitConfig {
@Bean
public Queue dbQueue(){
return new Queue("dbQueue");
}
}
上面配置类搞得是queue,可是这个queue也要一个生产者
package com.example.business_server.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitProducer {
private final AmqpTemplate template;
public RabbitProducer(AmqpTemplate template) {
this.template = template;
}
public void sendFetchDataQueue(Long itemId){
//
this.template.convertAndSend("dbQueue",itemId);
}
}
然后是我亲爱的消费者
package com.example.business_server.service.impl;
import com.example.business_server.dao.PostMapper;
import com.example.business_server.model.domain.Post;
import com.example.business_server.model.recom.Recommendation;
import com.example.business_server.producer.RabbitProducer;
import com.example.business_server.service.PostService;
import lombok.SneakyThrows;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(queues = "dbQueue")
public class PostServiceImpl implements PostService {
@Autowired
private RabbitProducer rabbitProducer;
private final RedisTemplate<String,String> redisTemplate;
private final PostMapper postMapper;
public PostServiceImpl(PostMapper postMapper, RedisTemplate<String,String> redisTemplate) {
this.postMapper = postMapper;
this.redisTemplate = redisTemplate;
}
@RabbitHandler
List<String> findImgUrlsByItemId(Long itemId){
List<String> urls=redisTemplate.opsForList().range(String.valueOf(itemId),0,-1);
return urls;
}
}
最后就是在PostServiceImpl中调用一下生产者就可以了,
@Override
public List<Post> findPostsByRecommendations(List<Recommendation> recommendations) {
List<Post> posts=new ArrayList<>();
for (Recommendation recommendation :
recommendations) {
//
Post post
}
return posts;
}
可是当我打下Post post的时候我突然发现这个生产者根本没有返回值(啊啊啊啊啊啊啊啊!!!!!!)
没办法没办法赶紧再搜一次消息队列,我发现…(后面再说了)
最后我还是回到了使用多线程的方式,其实解决那个dao层不能改返回类型的问题,其实很简单,只要
@Async
// 在PostServiceImpl再新建一个函数调用dao层,并包装一下就可以
CompletableFuture<Post> findPostById(Long itemId){
Post post=postMapper.findPostById(itemId);
return CompletableFuture.completedFuture(post);
}
Ok,一切都已经搞定了,让我来重新复盘一下吧。
复盘
首先要不要为了这个新改的imgUrls,然后弄一个redis。
我的观点是这值得
这确实增加了复杂度,然而因为这本来就是一个小改动,而且redis相对来说也更加简单,所以真不见得比在mybatis直接改注解,mysql数据库修改原表和新增加一个表要简单,事实上我认为反倒上手更加容易
另一方面redis的存取也更快,我也不需要通过imgUrls value来寻找itemId key
所以综合以上redis的速度更快,也很好地满足需求,上手也更加容易,那显然这是一个好的选择。
其次是消息队列的使用场景问题——什么时候用?
在问什么时候用之前,首先要问一问什么时候不能用?
- 像我这种明显要获得消费者返回值的,肯定是不能用。
- 像我这种小型但又不是以学习目的为主的项目也不要用,因为很明显这会极大增加复杂度(我是以学习目的为主的项目当然是想方设法应用到更多的技术了呢)
那什么时候用呢?
-
异步处理处理的时候可以用。比如说你的应用系统注册成功的时候既要发邮件又要发短信的话,那完全没必要等这两个都发完才注册成功,而是可以直接先注册成功再通过消息队列通知发邮件、发短信
-
应用解耦:在我这次的实践中之所以最后还是选用了线程池而不是消息队列的最重要原因是消息队列不返回结果,然而这正是消息队列第二个应用场景的原因所在。消息队列不能获得返回值,也不必要关心最后成没成功,更不关心其成功的结果是什么,他只是通知而已,然后便认为一切都已经搞定了,没有我的事了,这就很好地达到了解耦的目的。(当然这也是消息队列的一个缺陷所在)
名词解释——什么是耦合
我在到这里的时候,突然发现我对耦合的概念还是不是很清晰。故再次网上搜索,得出了我自己的理解。
什么是低耦合?一个很好的例子就是高中。在高中我们完全不需要与什么房东太太,与什么物业打交道,学校已经为我们安排好了住宿;也不需要整天在复杂的外卖系统,楼下的饭店以及自己做之间艰难地选择了,学校已经为我们安排了餐饮;更加不需要应对一大堆讨厌的同事,霸道的领导以及可恶的丈母娘,学校没有“人情世故”。学校为我们安排了一切,而我们只需要交钱,然后三年只做一件事——专心学习。
-
流量削峰:就是你的应用如果短时间有大量用户同时访问(比如秒杀)的话,消息队列可以帮助你控制流量在一定长度,并把超过长度的请求直接抛弃
-
还可以用于接受用户日志的消息队列以及消息通讯。