欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq初探

程序员文章站 2022-07-13 17:01:18
...

AMQP协议

AMQP 有四个非常重要的概念:虚拟机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。

  • 虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
  • 交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
  • 队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
  • 绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。

通常的操作流程是:

  • (1) 消费者: 创建信息通道。
  • (2) 消费者: 定义消息队列。
  • (3) 消费者: 定义特定类型的交换机。
  • (4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)。
  • (5) 消费者: 等待消息。
  • (6) 生产者: 创建消息。
  • (7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)。
  • (8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)。
  • (9) 消费者: 获取并处理消息,发送反馈。
  • (10) 结束: 关闭通道和连接。

 

RabbitMq介绍

由erlang(面向并发的变成语言)开发,遵循AMQP协议的消息代理。原理简单,通过接受、转发消息。

 

应用场景

处理无需及时返回、耗时的操作,采用异步处理,能够大大节省服务器的请求响应时间,提高系统吞吐量。 

 

概念答疑

生产者:发送消息的程序;

消费者:等待接受消息、并且处理消息的程序,如果处理大量堆积消息,只需要增加更多的消费者。

队列:存储消息,一个队列没有范围限制,本质上无限大的缓存。

MQ服务器:消息缓存服务器,一旦消息正确传递给消费者,消息会立即从内存删除。

 

系统架构

连接(connection):客户端和rabbitmq server之间的tcp连接;

虚拟连接(channel):消费者和生产者通过tcp连接到rabbitmq server,channel是建立在tcp连接上,避免频繁建立关闭tcp连接,影响系统性能,而且系统的tcp连接数有限制,从而限制系统处理高并发能力。有实验表明,1s的数据可以Publish10K的数据包(普通硬件环境)。

消息确认:一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完成,RabbitMQ 可以删除它了,保证消息不会丢失;

消息持久化:持久化消息到硬盘,保证不会丢失,但依旧有个短暂的时间窗口。需要更健壮的持久化保证,你可以使用出版者确认。

 

Exhanges

direct: routing key 匹配, 那么Message就会被传递到相应的queue中。

fanout:向响应的queue广播

topic: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue

 

代码DEMO

生产者
import com.whtr.eam.common.util.SystemContext;
import com.whtr.eam.platform.interfaceMgr.engine.ProtocolException;
import com.whtr.eam.platform.interfaceMgr.vo.DataVo;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;

package com.whtr.eam.platform.interfaceMgr.util;

import com.whtr.eam.common.util.SystemContext;
import com.whtr.eam.platform.interfaceMgr.engine.ProtocolException;
import com.whtr.eam.platform.interfaceMgr.vo.DataVo;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;

/**
 * 消息生产者
 * <p/>
 * Created by zhongmin 20167.5.
 */
public class ProtocolProducer extends ProtocolEndPoint {

    public ProtocolProducer(DataVo data) throws IOException {
        super(data.getName(), data.getExchangeName(), data.getExchangType(), data.getRoutingKey());
        setHost(data.getUrl());
        setPort(data.getPort());
        setUserName(data.getUserName());
        setPassword(data.getPassword());
        init();
    }



    public void sendMessage(String message) throws Exception {
        if(StringUtils.isBlank(message)){
            logger.info("待发送的消息为空");
            throw new ProtocolException("请求参数为空,无法发送消息!");
        }

        if(StringUtils.isNotBlank(getExchangeName())){
            channel.basicPublish(getExchangeName(), getRoutingKey(), null, message.getBytes());
        }else{
            channel.basicPublish("", getQueueName(), null, message.getBytes());
        }
    }

    public static void main(String[] args) throws Exception {
        String queueName = "q.lee";
        String exchangeName = "e.lee";
        String exchangType = "topic";
        String routingKey = "r.lee";

        DataVo data = new DataVo();
        data.setName(queueName);
        data.setExchangeName(exchangeName);
        data.setExchangType(exchangType);
        data.setRoutingKey(routingKey);
        data.setReqParam("lee mq");
        /**
         # mq.host=192.168.1.43
         mq.host=rabmq.inside.ppmoney
         mq.port=5672
         mq.user=guest
         mq.password=guest
         */
        data.setUserName("guest");
        data.setPassword("guest");
        data.setUrl("192.168.1.43");
        data.setPort(15672);
        
        ProtocolProducer protocolProducer = new ProtocolProducer(data);
        protocolProducer.sendMessage("leee");
    }
}

public class ProtocolProducer extends ProtocolEndPoint {

    public ProtocolProducer(DataVo data) throws IOException {
        super(data.getName(), data.getExchangeName(), data.getExchangType(), data.getRoutingKey());
        setHost(data.getUrl());
        setPort(data.getPort());
        setUserName(data.getUserName());
        setPassword(data.getPassword());
        init();
    }



    public void sendMessage(String message) throws Exception {
        if(StringUtils.isBlank(message)){
            logger.info("待发送的消息为空");
            throw new ProtocolException("请求参数为空,无法发送消息!");
        }

        if(StringUtils.isNotBlank(getExchangeName())){
            channel.basicPublish(getExchangeName(), getRoutingKey(), null, message.getBytes());
        }else{
            channel.basicPublish("", getQueueName(), null, message.getBytes());
        }
    }

    public static void main(String[] args) throws Exception {
        String queueName = "q.lee";
        String exchangeName = "e.lee";
        String exchangType = "topic";
        String routingKey = "r.lee";

        DataVo data = new DataVo();
        data.setName(queueName);
        data.setExchangeName(exchangeName);
        data.setExchangType(exchangType);
        data.setRoutingKey(routingKey);
        data.setReqParam("lee mq");
        /**
         # mq.host=192.168.1.43
         mq.host=rabmq.inside.ppmoney
         mq.port=5672
         mq.user=guest
         mq.password=guest
         */
        data.setUserName("guest");
        data.setPassword("guest");
        data.setUrl("192.168.1.43");
        data.setPort(15672);
        
        ProtocolProducer protocolProducer = new ProtocolProducer(data);
        protocolProducer.sendMessage("leee");
    }
}


队列抽象者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * 队列的抽象类
 * <p/>
 * Created by liangqq on 2015/12/7.
 */
public abstract class ProtocolEndPoint {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    protected Channel channel;
    protected Connection connection;
    protected String queueName;
    protected String exchangeName;
    protected String exchangeType;
    protected String routingKey;

    private ConnectionFactory factory;


    private String host;
    private Integer port;
    private String userName;
    private String password;

    public ProtocolEndPoint(String queueName, String exchangeName, String exchangeType, String routingKey) throws IOException {
        this.queueName = queueName;
        this.exchangeName = exchangeName;
        this.exchangeType = exchangeType;
        this.routingKey = routingKey;
    }

    public void init() throws IOException {
        ConnectionFactory factory = getFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(userName);
        factory.setPassword(password);
        factory.setAutomaticRecoveryEnabled(true);

        connection = factory.newConnection();
        connection.addShutdownListener(new MqShutdownListener());
        channel = connection.createChannel();
        if (!bindExchange()) {
            channel.queueDeclare(queueName, false, false, false, null);
        }
    }

    public ProtocolEndPoint(String queueName) throws IOException {
        this(queueName, null, null, null);
    }

    private boolean bindExchange() throws IOException {
        if (StringUtils.isNotBlank(exchangeName)) {
            String type = StringUtils.isNotBlank(exchangeType) ? exchangeType : "topic";
            channel.exchangeDeclare(exchangeName, type, true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            return true;
        }

        return false;
    }

    /**
     * 关闭channel和connection。并非必须,因为隐含是自动调用的
     */
    public void close() {
        try {
            this.channel.close();
            this.connection.close();
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        } finally {
            if (null != channel) {
                try {
                    this.channel.close();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
            if (null != connection) {
                try {
                    this.connection.close();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

    public String getQueueName() {
        return queueName;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public String getExchangeType() {
        return exchangeType;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public ConnectionFactory getFactory() {
        if(null == factory){
            factory = new ConnectionFactory();
        }
        return factory;
    }

    public void setFactory(ConnectionFactory factory) {
        this.factory = factory;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}



import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.nutz.json.Json;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 监听队列关闭的监听者
 *
 * Created by zhongminon 2016.7.5.
 */
public class MqShutdownListener implements ShutdownListener {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        logger.error("队列关闭:{}", Json.toJson(cause));
    }
}


 

消费者
public class ConsoumerDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

 

 参考更详细教程:http://blog.csdn.net/anzhsoft/article/details/19563091

 RabbitMq控制台:http://www.cnblogs.com/dubing/p/4017613.html

相关标签: rabbitmq amqp