欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbit Mq 实现定向消费,设置Ip白名单

程序员文章站 2022-03-19 16:05:59
rabbit Mq 实现定向消费,设置Ip白名单:初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种实现方案:方案一:Mq和spring集成的时候,做Ip白名单限制。在启动项目的时候就会检测本地的Ip是否属于配置的白名单Ip段(缺点:就是只能围绕)方案二:在mqsend的时候带上特定的Ip.然后在消费端进行判断,如果消费端不属于Ip白名单,那么直接再次放进mq,或者说抛异常。(......

 

rabbit Mq 实现定向消费,设置Ip白名单:

初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种实现方案:

方案一:Mq和spring集成的时候,做Ip白名单限制。在启动项目的时候就会检测本地的Ip是否属于配置的白名单Ip段(缺点:就是只能围绕)
方案二:在mq send 的时候带上特定的Ip. 然后在消费端进行判断,如果消费端不属于Ip白名单,那么直接再次放进mq,或者说抛异常。(缺点:直接放回mq,做法不好,每次放入顶端,抛异常感觉不错,但是得把Mq配置成支持事务的方式))
方案三:需要在rabbit mq 后台管理系统上面配置用户,且需要rabbit.confg 里面配置固定的白名单Ip   

这三种方案:我更倾向于方案三。但是现实往往是不允许你去这样做。

方案一:代码启动之后会在mq manager管理系统里展现出来。但是会因为没有设置消费者就会报错。但是不会影响其他的业务逻辑。不妨碍你的测试。如果你想测试关于Mq的消费,那么你就只能把自己的Ip设置在白名单之内。

package com.rabbitmq;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author Kevin
 * @version DCG 1.0
 * @date 2020/12/24 14:25
 */
@Configuration
@PropertySource("classpath:rabbitmq.properties")
@ComponentScan
@Slf4j
@Data
public class RabbitMqConfig{
    @Value("${consumers.Ip}")
    private String consumersIp;
    @Value("${mq.exchange}")
    private String exchange;
    @Value("${mq.queueName}")
    private String queueName;
    @Value("${routkey.consumer}")
    private String routkeyConsumer;
    @Value("${routkey.producer}")
    private String routkeyProducer;
    @Value("${current.env}")
    private String env;
    @Value("${mq.Listener.prefetch}")
    private String prefetch;
    @Value("${mq.Listener.concurrency}")
    private String concurrency;
    @Autowired
    RabbitmqResultConsumer rabbitmqResultConsumer;  //具体实现的消费者

    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    public DirectExchange directExchange() {
        // 支持持久化,长期不用补删除
        return new DirectExchange(exchange, true, false);
    }


    public Queue mqQueue() {
        // 支持持久化
        return new Queue(queueName, true);
    }


    @Bean
    public ThreadPoolTaskExecutor getThreadPoolTask(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setQueueCapacity(10000);
        threadPoolTaskExecutor.setKeepAliveSeconds(300);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return threadPoolTaskExecutor;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        try {
            //实例化容器
            container.setConnectionFactory(connectionFactory);
            //concurrency  设置消费者数量
            container.setConcurrentConsumers(Integer.valueOf(concurrency));
            //prefetch 限制每次发送多少数据
            container.setPrefetchCount(Integer.valueOf(prefetch));
            //自动确认,设置手动会让 消息重试失效,没有重试需求的随意
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //设置线程池
            container.setTaskExecutor(getThreadPoolTask());
            container.setQueueNames(queueName);
            rabbitAdmin(connectionFactory).declareBinding(BindingBuilder.bind(mqQueue()).to(directExchange()).with(routkeyConsumer));
            // 白名单,如果本地Ip是属于我们设置的白名单里的。那么就正常的注册成功。
            // 如果本地IP不属于Ip白名单范围内。那么就不设置消费者,那么就会停止这个消费者:Stopping container from aborted consumer
            if (!includeIp()){
//                throw new RuntimeException("Mq services are not within the scope of the MQ whitelist,Please contact administrator");
                log.error("The current address, MQ is not supported. Please contact the administrator");
            }else{
                container.setMessageListener(rabbitmqResultConsumer);
            }
        }catch (RuntimeException e){
            log.error("Mq service client error message:【{}】",e.getMessage());
        }
        return container;
    }

    /**
     * 判断某个Ip是否属于自己定义的Ip段
     * @param ip
     * @param cidr
     * @return
     */
    public static boolean isInRange(String ip, String cidr) {
        String[] ips = ip.split("\\.");
        int ipAddr = (Integer.parseInt(ips[0]) << 24)
                | (Integer.parseInt(ips[1]) << 16)
                | (Integer.parseInt(ips[2]) << 8) | Integer.parseInt(ips[3]);
        int type = Integer.parseInt(cidr.replaceAll(".*/", ""));
        int mask = 0xFFFFFFFF << (32 - type);
        String cidrIp = cidr.replaceAll("/.*", "");
        String[] cidrIps = cidrIp.split("\\.");
        int cidrIpAddr = (Integer.parseInt(cidrIps[0]) << 24)
                | (Integer.parseInt(cidrIps[1]) << 16)
                | (Integer.parseInt(cidrIps[2]) << 8)
                | Integer.parseInt(cidrIps[3]);

        return (ipAddr & mask) == (cidrIpAddr & mask);
    }

    private boolean includeIp(){
        boolean flag = false;
        try {
            String ip = InetAddress.getLocalHost().getHostAddress();
            List<String> consumersList = Arrays.asList(StringUtils.split(consumersIp, ","));
            for (String e:consumersList){
                if (isInRange(ip,e)){
                    flag =  true;
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return flag;
    }
}

 classpath:rabbitmq.properties:

#环境 远程连接地址
#direct exchanges
mq.exchange=

#routeKey(发送端)
routkey.producer=routkey
#routeKey(接收端)
routkey.consumer=routkey.consumer

# remote Mq Ip List(Ip 白名单)
consumers.Ip=222.222.222.0/24

#队列名称
mq.queueName=queue_name
#限制每次发送一条数据。
mq.Listener.prefetch=1
#同一个队列启动几个消费者
mq.Listener.concurrency=1

创建自己的消费者:



/**
 * 消息模板队列消费者
 */
@Slf4j
@Component
public class RabbitmqDcgCalcResultConsumer implements ChannelAwareMessageListener{
	
	@Override
	public void onMessage(Message msg, Channel arg1) {
		
		} catch (Exception e) {
			log.error("消费计算结果的Message发生错误",e);
		} finally {
			
		}
	}
}

好了这次就写到这里。  如果想知道方案二的实现方式。那么就在下面留言。

本文地址:https://blog.csdn.net/qq_37228713/article/details/111991401

相关标签: rabbitmq java