欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息队列

程序员文章站 2022-04-23 15:21:33
消息队列 前言: 说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。 简介: 消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。 演变: 我是很喜欢了解技术演进 ......

消息队列

前言:

说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。

简介:

消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。

演变:

我是很喜欢了解技术演进的,因为演进的过程展现了前辈们的智慧。

最早的程序串行执行就不说了。

程序调用中的方法调用,往往调用方与被调用方都存在与同一内存空间(从java角度说,都是在同一jvm中),所以方法调用的逻辑不会太复杂。简单来说,就是调用方(java中其实就是目标对象)将被调用方压入java虚拟机栈,从而执行(详见jvm)。或者等我什么时候,把我有关jvm的笔记贴出来(嘿嘿)。

后来呢,就是出现了对非本地jvm方法调用的需求(举个例子,我需要调用第三方的方法,如果每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了rpc与rmi的一个需要。那么在java中就出现了一个stub的技术,定义好后,相关方法就像调用本地一样(详见《head first java》相关章节)。当然了,这个时候已经有了中间件的概念了,所以也就有了corba等框架。谈到中间件,感兴趣的,可以去查询一下当时主流的中间件分类(如rpc,rmi,mom,tpm,orb)。

那么到了现在呢,分布式系统的通信可以按照同步与异步分为两大支柱。之所以这么理解,是因为分布式系统往往同步通信与异步通信都是需要的。简单提一下,同步通信业务逻辑相对简单,实现快速,可以实时获得回应,但耦合度较高。异步通信耦合度低,并可以进行消息堆积,消峰,但无法实时获取回应,业务逻辑复杂,从而提高系统复杂度(尤其当一条业务线与多层异步逻辑)等。之后有机会,我会举例细述。

当然了,在本篇中,只简单谈一下异步通信的主流实现-消息队列。

选择:

选择方面,我就不多说了,目前只用过rabbitmq,rocketmq,kafka。网上有关消息队列选择的文章很多,很细致,我就不赘述了。

代码实现:

这里贴出来的都是实际生产代码(如果内部版本也算的话,嘿嘿),所以如果有一些不是很熟悉的类,请查看import,是否是项目自身的类。或者也可以直接询问我。

初步实现:

这里的初步实现,是根据rabbitmq的原生方法进行编写(详细参考:《rabbitmq实战指南》第一章的两个代码清单及第二章的相关解释)。

producer:​​

package com.renewable.gateway.rabbitmq.producer;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.messageproperties;
import com.renewable.gateway.pojo.terminal;
import com.renewable.gateway.util.jsonutil;
import com.renewable.gateway.util.propertiesutil;
import org.springframework.stereotype.component;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

import static com.renewable.gateway.common.constant.rabbitmqconstant.*;

/**
 * @description:
 * @author: jarry
 */
@component("terminalproducer")
public class terminalproducer {

    private static final string ip_address = propertiesutil.getproperty(rabbitmq_host);
    private static final int port = integer.parseint(propertiesutil.getproperty(rabbitmq_port));
    private static final string user_name = propertiesutil.getproperty(rabbitmq_user_name);
    private static final string user_password = propertiesutil.getproperty(rabbitmq_user_password);

    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol";

    public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception {

        connectionfactory factory = new connectionfactory();
        factory.sethost(ip_address);
        factory.setport(port);
        factory.setusername(user_name);
        factory.setpassword(user_password);

        connection connection = factory.newconnection();
        channel channel = connection.createchannel();
        channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null);
        channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null);
        channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey);

        string terminalstr = jsonutil.obj2string(terminal);
        channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes());

        channel.close();
        connection.close();
    }
}

consumer:

package com.renewable.gateway.rabbitmq.consumer;

import com.rabbitmq.client.*;
import com.renewable.gateway.common.guavacache;
import com.renewable.gateway.common.serverresponse;
import com.renewable.gateway.pojo.terminal;
import com.renewable.gateway.service.iterminalservice;
import com.renewable.gateway.util.jsonutil;
import com.renewable.gateway.util.propertiesutil;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import java.io.ioexception;
import java.util.concurrent.timeoutexception;

import static com.renewable.gateway.common.constant.cacheconstant.terminal_mac;
import static com.renewable.gateway.common.constant.rabbitmqconstant.*;

/**
 * @description:
 * @author: jarry
 */
@component
public class terminalconsumer {

    @autowired
    private iterminalservice iterminalservice;

    private static final string terminal_config_centcontrol2terminal_exchange = "exchange-terminal-config-centcontrol2terminal";
    private static final string terminal_config_centcontrol2terminal_queue = "queue-terminal-config-centcontrol2terminal";
    private static final string terminal_config_centcontrol2terminal_routinetype = "topic";
    private static final string terminal_config_centcontrol2terminal_bindingkey = "terminal.config.centcontrol2terminal";

    @postconstruct
    public void messageonterminal() throws ioexception, timeoutexception, interruptedexception {
        address[] addresses = new address[]{
                new address(propertiesutil.getproperty(rabbitmq_host))
        };
        connectionfactory factory = new connectionfactory();
        factory.setusername(propertiesutil.getproperty(rabbitmq_user_name));
        factory.setpassword(propertiesutil.getproperty(rabbitmq_user_password));

        connection connection = factory.newconnection(addresses);
        final channel channel = connection.createchannel();
        channel.basicqos(64);   // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(常用于限流)
        consumer consumer = new defaultconsumer(channel) {


            @override
            public void handledelivery(string consumertag,
                                       envelope envelope,
                                       amqp.basicproperties properties,
                                       byte[] body) throws ioexception {
                // 1.接收数据,并反序列化出对象
                terminal receiveterminalconfig = jsonutil.string2obj(new string(body), terminal.class);

                // 2.验证是否是该终端的消息的消息     // 避免ack其他终端的消息
                if (receiveterminalconfig.getmac() == guavacache.getkey(terminal_mac)) {
                    // 业务代码
                    serverresponse response = iterminalservice.receiveterminalfromrabbitmq(receiveterminalconfig);
                    if (response.issuccess()) {
                        channel.basicack(envelope.getdeliverytag(), false);
                    }
                }
            }
        };
        channel.basicconsume(terminal_config_centcontrol2terminal_queue, consumer);
        // 等回调函数执行完毕后,关闭资源
        // 想了想还是不关闭资源,保持一个监听的状态,从而确保配置的实时更新
        //        timeunit.seconds.sleep(5);
        //        channel.close();
        //        connection.close();
    }
}

小结:

这是早期写的一个demo代码,是直接参照源码的。如果是学习rabbitmq的话,还是建议手写一下这种比较原始的程序,了解其中每个方法的作用,从而理解rabbitmq的思路。如果条件允许的话,还可以查看一下rabbitmq的底层通信协议-amqp(如果不方便下载,也可以私聊我)。

当然,此处可以通过@value直接导入相关配置(乃至到了springcloud后,可以通过@refreshscope等实现配置自动更新)。

与spring集成:

producer:

package com.renewable.terminal.rabbitmq.producer;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.messageproperties;
import com.renewable.terminal.pojo.terminal;
import com.renewable.terminal.util.jsonutil;
import org.springframework.stereotype.component;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @description:
 * @author: jarry
 */
@component("terminalproducer")
public class terminalproducer {

    private static string rabbitmqhost = "47.92.249.250";
    private static string rabbitmquser = "admin";
    private static string rabbitmqpassword = "123456";
    private static string rabbitmqport = "5672";

    private static final string ip_address = rabbitmqhost;
    private static final int port = integer.parseint(rabbitmqport);
    private static final string user_name = rabbitmquser;
    private static final string user_password = rabbitmqpassword;

    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol";

    public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception {

        connectionfactory factory = new connectionfactory();
        factory.sethost(ip_address);
        factory.setport(port);
        factory.setusername(user_name);
        factory.setpassword(user_password);

        connection connection = factory.newconnection();
        channel channel = connection.createchannel();
        channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null);
        channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null);
        channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey);

        string terminalstr = jsonutil.obj2string(terminal);
        channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes());

        channel.close();
        connection.close();
    }
}

consumer:

package com.renewable.terminal.rabbitmq.consumer;

import com.rabbitmq.client.*;
import com.renewable.terminal.init.serialsensorinit;
import com.renewable.terminal.init.terminalinit;
import com.renewable.terminal.common.guavacache;
import com.renewable.terminal.common.serverresponse;
import com.renewable.terminal.pojo.terminal;
import com.renewable.terminal.service.iterminalservice;
import com.renewable.terminal.util.jsonutil;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.amqpheaders;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.handler.annotation.headers;
import org.springframework.messaging.handler.annotation.payload;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import java.io.ioexception;
import java.util.map;
import java.util.concurrent.timeoutexception;

import static com.renewable.terminal.common.constant.cacheconstant.terminal_id;
import static com.renewable.terminal.common.constant.cacheconstant.terminal_mac;

/**
 * @description:
 * @author: jarry
 */
@component
@slf4j
public class terminalconsumer {

    @autowired
    private iterminalservice iterminalservice;

    @autowired
    private serialsensorinit serialsensorinit;


    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-centcontrol2terminal";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-centcontrol2terminal";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.centcontrol2terminal";

    //todo_finished 2019.05.16 完成终端机terminalconfig的接收与判断(id是否为长随机数,是否需要重新分配)
    @rabbitlistener(bindings = @queuebinding(
            value = @queue(value = terminal_config_terminal2centcontrol_queue, declare = "true"),
            exchange = @exchange(value = terminal_config_terminal2centcontrol_exchange, declare = "true", type = terminal_config_terminal2centcontrol_routinetype),
            key = terminal_config_terminal2centcontrol_bindingkey
    ))
    @rabbithandler
    public void messageonterminal(@payload string terminalstr, @headers map<string, object> headers, channel channel) throws ioexception {

        terminal terminal = jsonutil.string2obj(terminalstr, terminal.class);
        if (terminal == null){
            log.info("consume the null terminal config !");
            long deliverytag = (long) headers.get(amqpheaders.delivery_tag);
            channel.basicack(deliverytag, false);
        }
        if (!guavacache.getkey(terminal_mac).equals(terminal.getmac())){
            log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getmac(), guavacache.getkey(terminal_mac));
            return;
        }

        // 2.业务逻辑
        serverresponse response = iterminalservice.receiveterminalfromrabbitmq(terminal);
        log.info("start serialsensorinit");
        serialsensorinit.init();

        // 3.确认
        if (response.issuccess()) {
            long deliverytag = (long) headers.get(amqpheaders.delivery_tag);
            channel.basicack(deliverytag, false);
        }
    }
}

配置:

# rabbitmq 消费端配置
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 5
        max-concurrency: 10
        acknowledge-mode: manual
        # 限流
        prefetch: 1
    host: "localhost"
    port: 5672
    username: "admin"
    password: "123456"
    virtual-host: "/"
    connection-timeout: 15000

小结:

这里不得不赞一下spring,它通过提供rabbitmq地封装api-ampq,极大地简化了消息队列的代码。其实上述方法就是通过ampq的注解与yml配置来迅速实现rabbitmq的使用。

当然,这里还有很多的提升空间。比如说,通过@bean注解(建立目标配置)与公用方法提取,可以有效提高代码复用性。

简单扩展(与springstream集成):

这段代码并不是线上的代码,而是慕课网学习时留下的代码。主要实际生产中并没有使用springstream,但这确实是认识事件驱动模型的要给很好途径。

producer:

package com.imooc.order.message;

import org.springframework.cloud.stream.annotation.input;
import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.messagechannel;
import org.springframework.messaging.subscribablechannel;

/**
 * @description:
 * @author: jarry
 */
public interface streamclient {

    string input = "mymessage";
    string input2 = "mymessageack";


    @input(streamclient.input)
    subscribablechannel input();

    @output(streamclient.input)
    messagechannel output();

    @input(streamclient.input2)
    subscribablechannel input2();

    @output(streamclient.input2)
    messagechannel output2();
}

package com.imooc.order;

import org.junit.assert;
import org.junit.test;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import java.util.date;

/**
 * @description:
 * @author: jarry
 */
@component
public class mqsendertest extends orderapplicationtests{

    @autowired
    private amqptemplate amqptemplate;

    @test
    public void send(){
        amqptemplate.convertandsend("myqueue", "now: " + new date());
        assert.assertnotnull(new date());
    }
}

consumer:

package com.imooc.order.message;

import com.imooc.order.dto.orderdto;
import lombok.extern.slf4j.slf4j;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.annotation.streamlistener;
import org.springframework.messaging.handler.annotation.sendto;
import org.springframework.stereotype.component;

/**
 * @description:
 * @author: jarry
 */
@component
@enablebinding(streamclient.class)
@slf4j
public class streamreceiver {

//  @streamlistener(streamclient.input)
//  public void process(object message){
//      log.info("streamreceiver: {}", message);
//  }

    @streamlistener(streamclient.input)
    // 增加以下注解,可以在input消息消费后,返回一个消息。说白了就是rabbitmq对消息消费后的确认回调函数(貌似叫这个,意思就这样,之后细查)
    @sendto(streamclient.input2)
    public string process(orderdto message){
        log.info("streamreceiver: {}", message);
        return "received.";
    }

    @streamlistener(streamclient.input2)
    public void process2(string message){
        log.info("streamreceiver2: {}", message);
    }
}

总结:

在学习技术的过程中,一方面不断地感受到自己对技术了解的不足,另一方面则是发现更重要的是系统设计中技术选型的权衡。