欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

学成在线-第14天-讲义-媒资管理 二

程序员文章站 2022-05-30 21:21:23
...

1.3 发送视频处理消息
当视频上传成功后向MQ 发送视频 处理消息。
修改媒资管理服务的文件上传代码,当文件上传成功向MQ发送视频处理消息。 
1.3.1 RabbitMQ配置 
1、将media-processor工程下的RabbitmqConfifig配置类拷贝到media工程下。 
2、在media工程下配置mq队列等信息 
修改application.yml

xc‐service‐manage‐media:
mq:
queue‐media‐video‐processor: queue_media_video_processor
routingkey‐media‐video: routingkey_media_video

1.3.2 修改Service 
在文件合并方法中添加向mq发送视频处理消息的代码:

//向MQ发送视频处理消息
public ResponseResult sendProcessVideoMsg(String mediaId){
Optional<MediaFile> optional = mediaFileRepository.findById(fileMd5);
if(!optional.isPresent()){
return new ResponseResult(CommonCode.FAIL);
}
MediaFile mediaFile = optional.get();
//发送视频处理消息
Map<String,String> msgMap = new HashMap<>();
msgMap.put("mediaId",mediaId);
//发送的消息
String msg = JSON.toJSONString(msgMap);
try {
this.rabbitTemplate.convertAndSend(RabbitMQConfig.EX_MEDIA_PROCESSTASK,routingkey_media_video,
msg);
LOGGER.info("send media process task msg:{}",msg);
}catch (Exception e){
e.printStackTrace();
LOGGER.info("send media process task error,msg is:{},error:{}",msg,e.getMessage());
return new ResponseResult(CommonCode.FAIL);
}
return new ResponseResult(CommonCode.SUCCESS);
}

mergechunks方法最后调用sendProcessVideo方法。
 

......
//状态为上传成功
mediaFile.setFileStatus("301002");
mediaFileRepository.save(mediaFile);
String mediaId = mediaFile.getFileId();
//向MQ发送视频处理消息
sendProcessVideoMsg(mediaId);
......

1.4 视频处理测试 
测试流程: 
1、上传avi文件 
2、观察日志是否发送消息 
3、观察视频处理进程是否接收到消息进行处理 
4、观察mp4文件是否生成 
5、观察m3u8及 ts文件是否生成
1.5 视频处理并发设置 
代码中使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源。
可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 
1、在RabbitmqConfifig.java中添加容器工厂配置: 

//消费者并发数量
public static final int DEFAULT_CONCURRENT = 10;
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory
containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory
connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}

2、在@RabbitListener注解中指定容器工厂

//视频处理方法
@RabbitListener(queues = {"${xc‐service‐manage‐media.mq.queue‐media‐video‐processor}"},
containerFactory="customContainerFactory")

再次测试当队列有多个任务时消费端的并发处理能力。