ActiveMQ在C#中的应用示例分析
本文实例讲述了activemq在c#中的应用。分享给大家供大家参考,具体如下:
activemq是个好东东,不必多说。activemq提供多种语言支持,如java, c, c++, c#, ruby, perl, python, php等。由于我在windows下开发gui,比较关心c++和c#,其中c#的activemq很简单,apache提供nms(.net messaging service)支持.net开发,只需如下几个步骤即能建立简单的实现。c++的应用相对麻烦些,后面会有文章介绍。
1、去activemq官方网站下载最新版的activemq,网址:。我之前下的是5.3.1,5.3.2现在也已经出来了。
2、去activemq官方网站下载最新版的apache.nms,网址:,需要下载apache.nms和apache.nms.activemq两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的nms.activemq,apache.nms.activemq.dll在实际使用中有个bug,即停止activemq应用时会抛waitone函数异常,查看src包中的源码发现是由于apache.nms.activemq-1.2.0-src\src\main\csharp\transport\inactivitymonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。
private void stopmonitorthreads() { lock(monitor) { if(monitorstarted.compareandset(true, false)) { autoresetevent shutdownevent = new autoresetevent(false); // attempt to wait for the timers to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. this.readchecktimer.dispose(shutdownevent); shutdownevent.waitone(timespan.frommilliseconds(2000)); this.writechecktimer.dispose(shutdownevent); shutdownevent.waitone(timespan.frommilliseconds(2000)); //waitone的定义:public virtual bool waitone(timespan timeout,bool exitcontext) this.asynctasks.shutdown(); this.asynctasks = null; this.asyncwritetask = null; this.asyncerrortask = null; } } }
3、运行activemq,找到activemq解压后的bin文件夹:...\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动activemq服务器,默认端口为61616,这可在配置文件中修改。
4、写c#程序实现activemq的简单应用。新建c#工程(一个producter项目和一个consumer项目),winform或console程序均可,这里建的是console工程,添加对apache.nms.dll和apache.nms.activemq.dll的引用,然后即可编写实现代码了,简单的producer和consumer实现代码如下:
producer:
using system; using system.collections.generic; using system.text; using apache.nms; using apache.nms.activemq; using system.io; using system.xml.serialization; using system.runtime.serialization.formatters.binary; namespace publish { class program { static void main(string[] args) { try { //create the connection factory iconnectionfactory factory = new connectionfactory("tcp://localhost:61616/"); using (iconnection connection = factory.createconnection()) { //create the session using (isession session = connection.createsession()) { //create the producer for the topic/queue imessageproducer prod = session.createproducer( new apache.nms.activemq.commands.activemqtopic("testing")); //send messages int i = 0; while (!console.keyavailable) { itextmessage msg = prod.createtextmessage(); msg.text = i.tostring(); console.writeline("sending: " + i.tostring()); prod.send(msg, apache.nms.msgdeliverymode.nonpersistent, apache.nms.msgpriority.normal, timespan.minvalue); system.threading.thread.sleep(5000); i++; } } } console.readline(); } catch (system.exception e) { console.writeline("{0}",e.message); console.readline(); } } } }
consumer:
using system; using system.collections.generic; using system.text; using apache.nms; using apache.nms.activemq; using system.io; using system.xml.serialization; using system.runtime.serialization.formatters.binary; namespace subscribe { class program { static void main(string[] args) { try { //create the connection factory iconnectionfactory factory = new connectionfactory("tcp://localhost:61616/"); //create the connection using (iconnection connection = factory.createconnection()) { connection.clientid = "testing listener"; connection.start(); //create the session using (isession session = connection.createsession()) { //create the consumer imessageconsumer consumer = session.createdurableconsumer(new apache.nms.activemq.commands.activemqtopic("testing"), "testing listener", null, false); consumer.listener += new messagelistener(consumer_listener); console.readline(); } connection.stop(); connection.close(); } } catch (system.exception e) { console.writeline(e.message); } } static void consumer_listener(imessage message) { try { itextmessage msg = (itextmessage)message; console.writeline("receive: " + msg.text); } catch (system.exception e) { console.writeline(e.message); } } } }
程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing主题,因此只要生产者发送testing主题的消息到activemq服务器,服务器就将该消息发送给订阅了testing主题的消费者。
编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。
这个例子是建的主题(topic),activemq还支持另一种方式:queue,即p2p,两者有什么区别呢?区别在于,topic是广播,即如果某个topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而queue是点到点,即一个消息只能发给一个消费者,如果某个queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:
msg1-->consumer a
msg2-->consumer b
msg3-->consumer c
msg4-->consumer a
msg5-->consumer b
msg6-->consumer c
特殊情况是指:activemq支持过滤机制,即生产者可以设置消息的属性(properties),该属性与消费者端的selector对应,只有消费者设置的selector与消息的properties匹配,消息才会发给该消费者。topic和queue都支持selector。
properties和selector该如何设置呢?请看如下代码:
producer:
itextmessage msg = prod.createtextmessage(); msg.text = i.tostring(); msg.properties.setstring("myfilter", "test1"); console.writeline("sending: " + i.tostring()); prod.send(msg, apache.nms.msgdeliverymode.nonpersistent, apache.nms.msgpriority.normal, timespan.minvalue);
consumer:
//生成consumer时通过参数设置selector imessageconsumer consumer = session.createconsumer(new apache.nms.activemq.commands.activemqqueue("testing"), "myfilter='test1'");
更多关于c#相关内容感兴趣的读者可查看本站专题:《c#窗体操作技巧汇总》、《c#常见控件用法教程》、《winform控件用法总结》、《c#程序设计之线程使用技巧总结》、《c#操作excel技巧总结》、《c#中xml文件操作技巧汇总》、《c#数据结构与算法教程》、《c#数组操作技巧总结》及《c#面向对象程序设计入门教程》
希望本文所述对大家c#程序设计有所帮助。