消息队列
消息队列
前言:
说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。
简介:
消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。
演变:
我是很喜欢了解技术演进的,因为演进的过程展现了前辈们的智慧。
最早的程序串行执行就不说了。
程序调用中的方法调用,往往调用方与被调用方都存在与同一内存空间(从java角度说,都是在同一jvm中),所以方法调用的逻辑不会太复杂。简单来说,就是调用方(java中其实就是目标对象)将被调用方压入java虚拟机栈,从而执行(详见jvm)。或者等我什么时候,把我有关jvm的笔记贴出来(嘿嘿)。
后来呢,就是出现了对非本地jvm方法调用的需求(举个例子,我需要调用第三方的方法,如果每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了rpc与rmi的一个需要。那么在java中就出现了一个stub的技术,定义好后,相关方法就像调用本地一样(详见《head first java》相关章节)。当然了,这个时候已经有了中间件的概念了,所以也就有了corba等框架。谈到中间件,感兴趣的,可以去查询一下当时主流的中间件分类(如rpc,rmi,mom,tpm,orb)。
那么到了现在呢,分布式系统的通信可以按照同步与异步分为两大支柱。之所以这么理解,是因为分布式系统往往同步通信与异步通信都是需要的。简单提一下,同步通信业务逻辑相对简单,实现快速,可以实时获得回应,但耦合度较高。异步通信耦合度低,并可以进行消息堆积,消峰,但无法实时获取回应,业务逻辑复杂,从而提高系统复杂度(尤其当一条业务线与多层异步逻辑)等。之后有机会,我会举例细述。
当然了,在本篇中,只简单谈一下异步通信的主流实现-消息队列。
选择:
选择方面,我就不多说了,目前只用过rabbitmq,rocketmq,kafka。网上有关消息队列选择的文章很多,很细致,我就不赘述了。
代码实现:
这里贴出来的都是实际生产代码(如果内部版本也算的话,嘿嘿),所以如果有一些不是很熟悉的类,请查看import,是否是项目自身的类。或者也可以直接询问我。
初步实现:
这里的初步实现,是根据rabbitmq的原生方法进行编写(详细参考:《rabbitmq实战指南》第一章的两个代码清单及第二章的相关解释)。
producer:
package com.renewable.gateway.rabbitmq.producer; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.messageproperties; import com.renewable.gateway.pojo.terminal; import com.renewable.gateway.util.jsonutil; import com.renewable.gateway.util.propertiesutil; import org.springframework.stereotype.component; import java.io.ioexception; import java.util.concurrent.timeoutexception; import static com.renewable.gateway.common.constant.rabbitmqconstant.*; /** * @description: * @author: jarry */ @component("terminalproducer") public class terminalproducer { private static final string ip_address = propertiesutil.getproperty(rabbitmq_host); private static final int port = integer.parseint(propertiesutil.getproperty(rabbitmq_port)); private static final string user_name = propertiesutil.getproperty(rabbitmq_user_name); private static final string user_password = propertiesutil.getproperty(rabbitmq_user_password); private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_routinetype = "topic"; private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol"; public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception { connectionfactory factory = new connectionfactory(); factory.sethost(ip_address); factory.setport(port); factory.setusername(user_name); factory.setpassword(user_password); connection connection = factory.newconnection(); channel channel = connection.createchannel(); channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null); channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null); channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey); string terminalstr = jsonutil.obj2string(terminal); channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes()); channel.close(); connection.close(); } }
consumer:
package com.renewable.gateway.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.gateway.common.guavacache; import com.renewable.gateway.common.serverresponse; import com.renewable.gateway.pojo.terminal; import com.renewable.gateway.service.iterminalservice; import com.renewable.gateway.util.jsonutil; import com.renewable.gateway.util.propertiesutil; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import javax.annotation.postconstruct; import java.io.ioexception; import java.util.concurrent.timeoutexception; import static com.renewable.gateway.common.constant.cacheconstant.terminal_mac; import static com.renewable.gateway.common.constant.rabbitmqconstant.*; /** * @description: * @author: jarry */ @component public class terminalconsumer { @autowired private iterminalservice iterminalservice; private static final string terminal_config_centcontrol2terminal_exchange = "exchange-terminal-config-centcontrol2terminal"; private static final string terminal_config_centcontrol2terminal_queue = "queue-terminal-config-centcontrol2terminal"; private static final string terminal_config_centcontrol2terminal_routinetype = "topic"; private static final string terminal_config_centcontrol2terminal_bindingkey = "terminal.config.centcontrol2terminal"; @postconstruct public void messageonterminal() throws ioexception, timeoutexception, interruptedexception { address[] addresses = new address[]{ new address(propertiesutil.getproperty(rabbitmq_host)) }; connectionfactory factory = new connectionfactory(); factory.setusername(propertiesutil.getproperty(rabbitmq_user_name)); factory.setpassword(propertiesutil.getproperty(rabbitmq_user_password)); connection connection = factory.newconnection(addresses); final channel channel = connection.createchannel(); channel.basicqos(64); // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(常用于限流) consumer consumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // 1.接收数据,并反序列化出对象 terminal receiveterminalconfig = jsonutil.string2obj(new string(body), terminal.class); // 2.验证是否是该终端的消息的消息 // 避免ack其他终端的消息 if (receiveterminalconfig.getmac() == guavacache.getkey(terminal_mac)) { // 业务代码 serverresponse response = iterminalservice.receiveterminalfromrabbitmq(receiveterminalconfig); if (response.issuccess()) { channel.basicack(envelope.getdeliverytag(), false); } } } }; channel.basicconsume(terminal_config_centcontrol2terminal_queue, consumer); // 等回调函数执行完毕后,关闭资源 // 想了想还是不关闭资源,保持一个监听的状态,从而确保配置的实时更新 // timeunit.seconds.sleep(5); // channel.close(); // connection.close(); } }
小结:
这是早期写的一个demo代码,是直接参照源码的。如果是学习rabbitmq的话,还是建议手写一下这种比较原始的程序,了解其中每个方法的作用,从而理解rabbitmq的思路。如果条件允许的话,还可以查看一下rabbitmq的底层通信协议-amqp(如果不方便下载,也可以私聊我)。
当然,此处可以通过@value直接导入相关配置(乃至到了springcloud后,可以通过@refreshscope等实现配置自动更新)。
与spring集成:
producer:
package com.renewable.terminal.rabbitmq.producer; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.messageproperties; import com.renewable.terminal.pojo.terminal; import com.renewable.terminal.util.jsonutil; import org.springframework.stereotype.component; import java.io.ioexception; import java.util.concurrent.timeoutexception; /** * @description: * @author: jarry */ @component("terminalproducer") public class terminalproducer { private static string rabbitmqhost = "47.92.249.250"; private static string rabbitmquser = "admin"; private static string rabbitmqpassword = "123456"; private static string rabbitmqport = "5672"; private static final string ip_address = rabbitmqhost; private static final int port = integer.parseint(rabbitmqport); private static final string user_name = rabbitmquser; private static final string user_password = rabbitmqpassword; private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_routinetype = "topic"; private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol"; private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol"; public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception { connectionfactory factory = new connectionfactory(); factory.sethost(ip_address); factory.setport(port); factory.setusername(user_name); factory.setpassword(user_password); connection connection = factory.newconnection(); channel channel = connection.createchannel(); channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null); channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null); channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey); string terminalstr = jsonutil.obj2string(terminal); channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes()); channel.close(); connection.close(); } }
consumer:
package com.renewable.terminal.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.terminal.init.serialsensorinit; import com.renewable.terminal.init.terminalinit; import com.renewable.terminal.common.guavacache; import com.renewable.terminal.common.serverresponse; import com.renewable.terminal.pojo.terminal; import com.renewable.terminal.service.iterminalservice; import com.renewable.terminal.util.jsonutil; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.amqpheaders; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.handler.annotation.headers; import org.springframework.messaging.handler.annotation.payload; import org.springframework.stereotype.component; import javax.annotation.postconstruct; import java.io.ioexception; import java.util.map; import java.util.concurrent.timeoutexception; import static com.renewable.terminal.common.constant.cacheconstant.terminal_id; import static com.renewable.terminal.common.constant.cacheconstant.terminal_mac; /** * @description: * @author: jarry */ @component @slf4j public class terminalconsumer { @autowired private iterminalservice iterminalservice; @autowired private serialsensorinit serialsensorinit; private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-centcontrol2terminal"; private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-centcontrol2terminal"; private static final string terminal_config_terminal2centcontrol_routinetype = "topic"; private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.centcontrol2terminal"; //todo_finished 2019.05.16 完成终端机terminalconfig的接收与判断(id是否为长随机数,是否需要重新分配) @rabbitlistener(bindings = @queuebinding( value = @queue(value = terminal_config_terminal2centcontrol_queue, declare = "true"), exchange = @exchange(value = terminal_config_terminal2centcontrol_exchange, declare = "true", type = terminal_config_terminal2centcontrol_routinetype), key = terminal_config_terminal2centcontrol_bindingkey )) @rabbithandler public void messageonterminal(@payload string terminalstr, @headers map<string, object> headers, channel channel) throws ioexception { terminal terminal = jsonutil.string2obj(terminalstr, terminal.class); if (terminal == null){ log.info("consume the null terminal config !"); long deliverytag = (long) headers.get(amqpheaders.delivery_tag); channel.basicack(deliverytag, false); } if (!guavacache.getkey(terminal_mac).equals(terminal.getmac())){ log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getmac(), guavacache.getkey(terminal_mac)); return; } // 2.业务逻辑 serverresponse response = iterminalservice.receiveterminalfromrabbitmq(terminal); log.info("start serialsensorinit"); serialsensorinit.init(); // 3.确认 if (response.issuccess()) { long deliverytag = (long) headers.get(amqpheaders.delivery_tag); channel.basicack(deliverytag, false); } } }
配置:
# rabbitmq 消费端配置 spring: rabbitmq: listener: simple: concurrency: 5 max-concurrency: 10 acknowledge-mode: manual # 限流 prefetch: 1 host: "localhost" port: 5672 username: "admin" password: "123456" virtual-host: "/" connection-timeout: 15000
小结:
这里不得不赞一下spring,它通过提供rabbitmq地封装api-ampq,极大地简化了消息队列的代码。其实上述方法就是通过ampq的注解与yml配置来迅速实现rabbitmq的使用。
当然,这里还有很多的提升空间。比如说,通过@bean注解(建立目标配置)与公用方法提取,可以有效提高代码复用性。
简单扩展(与springstream集成):
这段代码并不是线上的代码,而是慕课网学习时留下的代码。主要实际生产中并没有使用springstream,但这确实是认识事件驱动模型的要给很好途径。
producer:
package com.imooc.order.message; import org.springframework.cloud.stream.annotation.input; import org.springframework.cloud.stream.annotation.output; import org.springframework.messaging.messagechannel; import org.springframework.messaging.subscribablechannel; /** * @description: * @author: jarry */ public interface streamclient { string input = "mymessage"; string input2 = "mymessageack"; @input(streamclient.input) subscribablechannel input(); @output(streamclient.input) messagechannel output(); @input(streamclient.input2) subscribablechannel input2(); @output(streamclient.input2) messagechannel output2(); }
package com.imooc.order; import org.junit.assert; import org.junit.test; import org.springframework.amqp.core.amqptemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.util.date; /** * @description: * @author: jarry */ @component public class mqsendertest extends orderapplicationtests{ @autowired private amqptemplate amqptemplate; @test public void send(){ amqptemplate.convertandsend("myqueue", "now: " + new date()); assert.assertnotnull(new date()); } }
consumer:
package com.imooc.order.message; import com.imooc.order.dto.orderdto; import lombok.extern.slf4j.slf4j; import org.springframework.cloud.stream.annotation.enablebinding; import org.springframework.cloud.stream.annotation.streamlistener; import org.springframework.messaging.handler.annotation.sendto; import org.springframework.stereotype.component; /** * @description: * @author: jarry */ @component @enablebinding(streamclient.class) @slf4j public class streamreceiver { // @streamlistener(streamclient.input) // public void process(object message){ // log.info("streamreceiver: {}", message); // } @streamlistener(streamclient.input) // 增加以下注解,可以在input消息消费后,返回一个消息。说白了就是rabbitmq对消息消费后的确认回调函数(貌似叫这个,意思就这样,之后细查) @sendto(streamclient.input2) public string process(orderdto message){ log.info("streamreceiver: {}", message); return "received."; } @streamlistener(streamclient.input2) public void process2(string message){ log.info("streamreceiver2: {}", message); } }
总结:
在学习技术的过程中,一方面不断地感受到自己对技术了解的不足,另一方面则是发现更重要的是系统设计中技术选型的权衡。
上一篇: 梦里的冒险