欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ Client封装连接及业务处理接口

程序员文章站 2022-06-14 22:02:05
一、RabbitMQ介绍RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。二、rabbitMQ安装RabbitMQ Download参考安装博客三、封装RabbitMqClient.javaimport java.io.IOException;import org.apache.commons.lang....

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

二、rabbitMQ安装

RabbitMQ Download
参考安装博客

三、封装RabbitMqClient.java

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.mina.proxy.utils.ByteUtilities;
import org.slf4j.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

/**
 * RabbitMQ连接客户端工具类
 * @author david(857332533@qq.com)
 * 2020年12月31日
 */
@Slf4j
public class RabbitMqClient implements Runnable{
	private final static Logger rabbitLogger = org.slf4j.LoggerFactory.getLogger("rabbit");
	private Connection connection = null;
	private Channel channel = null;
	private String host = "";
	private int port = 5672;
	private String userName = "";
	private String password = "";
	private String virtualHost = "";
	private String queueName = "";
	private boolean isConnected = false;
	private MessageHandler messageHandler;
	private Thread thread;
	
	public RabbitMqClient(String host,int port,String queueName,MessageHandler messageHandler) {
        this(host, port, null, queueName, messageHandler);
	}
	
	public RabbitMqClient(String host,int port,String virtualHost,String queueName,MessageHandler messageHandler) {
        this(host,port,null,null,virtualHost,queueName,messageHandler);
	}
	
	public RabbitMqClient(String host,int port,String userName,String password,String virtualHost,String queueName,MessageHandler messageHandler) {
        this.host = host;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.queueName = queueName;
        this.messageHandler = messageHandler;
        //启动
        start();
        //启动线程
        this.thread = new Thread(this);
        this.thread.start();
	}
	
	public void setMessageHandler(MessageHandler messageHandler) {
		this.messageHandler = messageHandler;
	}
	
	public Thread getThread() {
		return thread;
	}
	
	public String getQueueName() {
		return queueName;
	}
	
	public boolean isConnected() {
		return isConnected;
	}

	/**
	 * #启动监听
	 */
	public void start() {
		try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            if(StringUtils.isNotBlank(userName)) {
                factory.setUsername(userName);
            }
            if(StringUtils.isNotBlank(password)) {
                factory.setPassword(password);
            }
            if(StringUtils.isNotBlank(virtualHost)) {
                factory.setVirtualHost(virtualHost);
            }
            connection = factory.newConnection();
            channel = connection.createChannel();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                	handleDeliveryClient(consumerTag, envelope, properties, body);
                }
            };
            // channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queueName, true, consumer);
            isConnected = true;
    		log.info("start "+queueName+" success");
        }catch (Exception e) {
        	log.error("启动MQ监听异常",e);
        	isConnected = false;
        	close();
		}
	}
	
	/**
	 * #释放资源
	 */
	public void close() {
		if(null != channel) {
			try {
				channel.close();
				channel = null;
			} catch (Exception e) {}
		}
		if(null != connection) {
			try {
				connection.close();
				connection = null;
			} catch (Exception e) {}
		}
		isConnected = false;
		log.info("close "+queueName+" success");
	}
	
	/**
	 * #消息数据解析处理
	 * @param consumerTag
	 * @param envelope
	 * @param properties
	 * @param body
	 * @throws IOException
	 */
	public void handleDeliveryClient(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
		try {
			rabbitLogger.info("exchang->{},routingKey->{},queueName->{}->recv:{}",
					envelope.getExchange(),envelope.getRoutingKey(),this.queueName,ByteUtilities.asHex(body));
			if(null != this.messageHandler) {
				this.messageHandler.handlerMessage(this.queueName,body);
			}
		} catch (Exception e) {
			log.error("RabbitMQ消息解析异常",e);
		}
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while (true) {
			try {
				log.info(this.queueName+" is running");
				if(!isConnected) {
					log.info("重连MQ:{}",this.queueName);
					start();
				}
				Thread.sleep(1000*60);
			} catch (Exception e) {}
		}
	}
}

四、消息处理MessageHandler.java

public interface MessageHandler {
	
	/**
	 * 处理MQ消息数据
	 * @param body
	 */
	public void handlerMessage(String queueName,byte[] body);
}

五、总结

封装RabbitMqClient对象,用于连接RabbitMQ,自动ACK确认消息,开启线程自动心跳检测,如果connect失败,则会自动重连。
定义MessageHandler接口,用于处理业务数据。

本文地址:https://blog.csdn.net/u010989191/article/details/112220574

相关标签: Java rabbitmq