rabbit Mq 实现定向消费,设置Ip白名单
程序员文章站
2022-06-28 07:56:52
rabbit Mq 实现定向消费,设置Ip白名单:初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种实现方案:方案一:Mq和spring集成的时候,做Ip白名单限制。在启动项目的时候就会检测本地的Ip是否属于配置的白名单Ip段(缺点:就是只能围绕)方案二:在mqsend的时候带上特定的Ip.然后在消费端进行判断,如果消费端不属于Ip白名单,那么直接再次放进mq,或者说抛异常。(......
rabbit Mq 实现定向消费,设置Ip白名单:
初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种实现方案:
方案一:Mq和spring集成的时候,做Ip白名单限制。在启动项目的时候就会检测本地的Ip是否属于配置的白名单Ip段(缺点:就是只能围绕)
方案二:在mq send 的时候带上特定的Ip. 然后在消费端进行判断,如果消费端不属于Ip白名单,那么直接再次放进mq,或者说抛异常。(缺点:直接放回mq,做法不好,每次放入顶端,抛异常感觉不错,但是得把Mq配置成支持事务的方式))
方案三:需要在rabbit mq 后台管理系统上面配置用户,且需要rabbit.confg 里面配置固定的白名单Ip
这三种方案:我更倾向于方案三。但是现实往往是不允许你去这样做。
方案一:代码启动之后会在mq manager管理系统里展现出来。但是会因为没有设置消费者就会报错。但是不会影响其他的业务逻辑。不妨碍你的测试。如果你想测试关于Mq的消费,那么你就只能把自己的Ip设置在白名单之内。
package com.rabbitmq;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author Kevin
* @version DCG 1.0
* @date 2020/12/24 14:25
*/
@Configuration
@PropertySource("classpath:rabbitmq.properties")
@ComponentScan
@Slf4j
@Data
public class RabbitMqConfig{
@Value("${consumers.Ip}")
private String consumersIp;
@Value("${mq.exchange}")
private String exchange;
@Value("${mq.queueName}")
private String queueName;
@Value("${routkey.consumer}")
private String routkeyConsumer;
@Value("${routkey.producer}")
private String routkeyProducer;
@Value("${current.env}")
private String env;
@Value("${mq.Listener.prefetch}")
private String prefetch;
@Value("${mq.Listener.concurrency}")
private String concurrency;
@Autowired
RabbitmqResultConsumer rabbitmqResultConsumer; //具体实现的消费者
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
public DirectExchange directExchange() {
// 支持持久化,长期不用补删除
return new DirectExchange(exchange, true, false);
}
public Queue mqQueue() {
// 支持持久化
return new Queue(queueName, true);
}
@Bean
public ThreadPoolTaskExecutor getThreadPoolTask(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setQueueCapacity(10000);
threadPoolTaskExecutor.setKeepAliveSeconds(300);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return threadPoolTaskExecutor;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
try {
//实例化容器
container.setConnectionFactory(connectionFactory);
//concurrency 设置消费者数量
container.setConcurrentConsumers(Integer.valueOf(concurrency));
//prefetch 限制每次发送多少数据
container.setPrefetchCount(Integer.valueOf(prefetch));
//自动确认,设置手动会让 消息重试失效,没有重试需求的随意
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置线程池
container.setTaskExecutor(getThreadPoolTask());
container.setQueueNames(queueName);
rabbitAdmin(connectionFactory).declareBinding(BindingBuilder.bind(mqQueue()).to(directExchange()).with(routkeyConsumer));
// 白名单,如果本地Ip是属于我们设置的白名单里的。那么就正常的注册成功。
// 如果本地IP不属于Ip白名单范围内。那么就不设置消费者,那么就会停止这个消费者:Stopping container from aborted consumer
if (!includeIp()){
// throw new RuntimeException("Mq services are not within the scope of the MQ whitelist,Please contact administrator");
log.error("The current address, MQ is not supported. Please contact the administrator");
}else{
container.setMessageListener(rabbitmqResultConsumer);
}
}catch (RuntimeException e){
log.error("Mq service client error message:【{}】",e.getMessage());
}
return container;
}
/**
* 判断某个Ip是否属于自己定义的Ip段
* @param ip
* @param cidr
* @return
*/
public static boolean isInRange(String ip, String cidr) {
String[] ips = ip.split("\\.");
int ipAddr = (Integer.parseInt(ips[0]) << 24)
| (Integer.parseInt(ips[1]) << 16)
| (Integer.parseInt(ips[2]) << 8) | Integer.parseInt(ips[3]);
int type = Integer.parseInt(cidr.replaceAll(".*/", ""));
int mask = 0xFFFFFFFF << (32 - type);
String cidrIp = cidr.replaceAll("/.*", "");
String[] cidrIps = cidrIp.split("\\.");
int cidrIpAddr = (Integer.parseInt(cidrIps[0]) << 24)
| (Integer.parseInt(cidrIps[1]) << 16)
| (Integer.parseInt(cidrIps[2]) << 8)
| Integer.parseInt(cidrIps[3]);
return (ipAddr & mask) == (cidrIpAddr & mask);
}
private boolean includeIp(){
boolean flag = false;
try {
String ip = InetAddress.getLocalHost().getHostAddress();
List<String> consumersList = Arrays.asList(StringUtils.split(consumersIp, ","));
for (String e:consumersList){
if (isInRange(ip,e)){
flag = true;
}
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return flag;
}
}
classpath:rabbitmq.properties:
#环境 远程连接地址
#direct exchanges
mq.exchange=
#routeKey(发送端)
routkey.producer=routkey
#routeKey(接收端)
routkey.consumer=routkey.consumer
# remote Mq Ip List(Ip 白名单)
consumers.Ip=222.222.222.0/24
#队列名称
mq.queueName=queue_name
#限制每次发送一条数据。
mq.Listener.prefetch=1
#同一个队列启动几个消费者
mq.Listener.concurrency=1
创建自己的消费者:
/**
* 消息模板队列消费者
*/
@Slf4j
@Component
public class RabbitmqDcgCalcResultConsumer implements ChannelAwareMessageListener{
@Override
public void onMessage(Message msg, Channel arg1) {
} catch (Exception e) {
log.error("消费计算结果的Message发生错误",e);
} finally {
}
}
}
好了这次就写到这里。 如果想知道方案二的实现方式。那么就在下面留言。
本文地址:https://blog.csdn.net/qq_37228713/article/details/111991401