RabbitMQ Client封装连接及业务处理接口
程序员文章站
2022-03-10 08:06:18
一、RabbitMQ介绍RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。二、rabbitMQ安装RabbitMQ Download参考安装博客三、封装RabbitMqClient.javaimport java.io.IOException;import org.apache.commons.lang....
一、RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
二、rabbitMQ安装
三、封装RabbitMqClient.java
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.mina.proxy.utils.ByteUtilities;
import org.slf4j.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
/**
* RabbitMQ连接客户端工具类
* @author david(857332533@qq.com)
* 2020年12月31日
*/
@Slf4j
public class RabbitMqClient implements Runnable{
private final static Logger rabbitLogger = org.slf4j.LoggerFactory.getLogger("rabbit");
private Connection connection = null;
private Channel channel = null;
private String host = "";
private int port = 5672;
private String userName = "";
private String password = "";
private String virtualHost = "";
private String queueName = "";
private boolean isConnected = false;
private MessageHandler messageHandler;
private Thread thread;
public RabbitMqClient(String host,int port,String queueName,MessageHandler messageHandler) {
this(host, port, null, queueName, messageHandler);
}
public RabbitMqClient(String host,int port,String virtualHost,String queueName,MessageHandler messageHandler) {
this(host,port,null,null,virtualHost,queueName,messageHandler);
}
public RabbitMqClient(String host,int port,String userName,String password,String virtualHost,String queueName,MessageHandler messageHandler) {
this.host = host;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.queueName = queueName;
this.messageHandler = messageHandler;
//启动
start();
//启动线程
this.thread = new Thread(this);
this.thread.start();
}
public void setMessageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
public Thread getThread() {
return thread;
}
public String getQueueName() {
return queueName;
}
public boolean isConnected() {
return isConnected;
}
/**
* #启动监听
*/
public void start() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
if(StringUtils.isNotBlank(userName)) {
factory.setUsername(userName);
}
if(StringUtils.isNotBlank(password)) {
factory.setPassword(password);
}
if(StringUtils.isNotBlank(virtualHost)) {
factory.setVirtualHost(virtualHost);
}
connection = factory.newConnection();
channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
handleDeliveryClient(consumerTag, envelope, properties, body);
}
};
// channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
channel.basicConsume(queueName, true, consumer);
isConnected = true;
log.info("start "+queueName+" success");
}catch (Exception e) {
log.error("启动MQ监听异常",e);
isConnected = false;
close();
}
}
/**
* #释放资源
*/
public void close() {
if(null != channel) {
try {
channel.close();
channel = null;
} catch (Exception e) {}
}
if(null != connection) {
try {
connection.close();
connection = null;
} catch (Exception e) {}
}
isConnected = false;
log.info("close "+queueName+" success");
}
/**
* #消息数据解析处理
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
public void handleDeliveryClient(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
rabbitLogger.info("exchang->{},routingKey->{},queueName->{}->recv:{}",
envelope.getExchange(),envelope.getRoutingKey(),this.queueName,ByteUtilities.asHex(body));
if(null != this.messageHandler) {
this.messageHandler.handlerMessage(this.queueName,body);
}
} catch (Exception e) {
log.error("RabbitMQ消息解析异常",e);
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
log.info(this.queueName+" is running");
if(!isConnected) {
log.info("重连MQ:{}",this.queueName);
start();
}
Thread.sleep(1000*60);
} catch (Exception e) {}
}
}
}
四、消息处理MessageHandler.java
public interface MessageHandler {
/**
* 处理MQ消息数据
* @param body
*/
public void handlerMessage(String queueName,byte[] body);
}
五、总结
封装RabbitMqClient对象,用于连接RabbitMQ,自动ACK确认消息,开启线程自动心跳检测,如果connect失败,则会自动重连。
定义MessageHandler接口,用于处理业务数据。
本文地址:https://blog.csdn.net/u010989191/article/details/112220574