C#实现同Active MQ通讯的方法
程序员文章站
2022-05-18 13:01:09
本文实例讲述了c#实现同active mq通讯的方法。分享给大家供大家参考,具体如下:
内容概要:
主要以源码的形式介绍如何用c#实现同active mq 的通讯。本文...
本文实例讲述了c#实现同active mq通讯的方法。分享给大家供大家参考,具体如下:
内容概要:
主要以源码的形式介绍如何用c#实现同active mq 的通讯。本文假设你已经正确安装jdk1.6.x,了解active mq并有一定的编程基础。
正文:
jms 程序的最终目的是生产和消费的消息能被其他程序使用,jms 的 message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非jms 程序格式的消息。
message 由消息头,属性和消息体三部份组成。
active mq支持过滤机制,即生产者可以设置消息的属性(properties),该属性与消费者端的selector对应,只有消费者设置的selector与消息的properties匹配,消息才会发给该消费者。topic和queue都支持selector。
示例代码:
using system; using system.collections.generic; using system.linq; using system.text; using system.windows; using system.windows.controls; using system.windows.data; using system.windows.documents; using system.windows.input; using system.windows.media; using system.windows.media.imaging; using system.windows.navigation; using system.windows.shapes; using apache.nms; using system.diagnostics; using apache.nms.util; using system.windows.threading; /* * 功能描述:c#使用activemq示例 * 修改次数:2 * 最后更新: by kagula,2012-07-31 * * 前提条件: * [1]apache-activemq-5.4.2 * [2]apache.nms.activemq-1.5.6-bin * [3]winxp sp3 * [4]vs2008 sp1 * [5]wpf工程 with .net framework 3.5 * * 启动 * * 不带安全控制方式启动 * [你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat * * 安全方式启动 * 添加环境变量: activemq_encryption_password=activemq * [你的解压路径]\apache-activemq-5.4.2\bin>activemq xbean:file:../conf/activemq-security.xml * * active mq 管理地址 * http://127.0.0.1:8161/admin/ * 添加访问"http://127.0.0.1:8161/admin/"的限制 * * 第一步:添加访问限制 * 修改d:\apache\apache-activemq-5.4.2\conf\jetty.xml文件 * 下面这行编码,原 * <property name="authenticate" value="true" /> * 修改为 * <property name="authenticate" value="false" /> * * 第二步:修改登录用户名密码,缺省分别为admin,admin * d:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties * * 用户管理(前提:以安全方式启动activemq) * * 在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码 * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名 * e.g. 添加oa用户,密码同用户名。 * <authenticationuser username="oa" password="oa" groups="users,admins"/> * * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的topic或queue * 只能被哪些用户组read 或 write。 * * * 配置c# with wpf项目 * 项目的[application]->[targetframework]属性设置为[.netframework 3.5](这是vs2008wpf工程的默认设置) * 添加[你的解压路径]\apache.nms.activemq-1.5.6-bin\lib\apache.nms\net-3.5\apache.nms.dll的引用 * apache.nms.dll相当于接口 * * 如果是以debug方式调试 * 把[你的解压路径]\apache.nms.activemq-1.5.6-bin\build\net-3.5\debug\目录下的 * apache.nms.activemq.dll文件复制到你项目的debug目录下 * apache.nms.activemq.dll相当于实现 * * 如果是以release方式调试 * 参考上文,去取apache.nms,release目录下相应的dll文件,并复制到你项目的release目录下。 * * * 参考资料 * [1]《c#调用activemq官方示例》 http://activemq.apache.org/nms/examples.html * [2]《activemq nms下载地址》http://activemq.apache.org/nms/activemq-downloads.html * [3]《active mq在c#中的应用示例》//www.jb51.net/article/87956.htm * [4]《nms api reference》http://activemq.apache.org/nms/nms-api.html */ namespace testactivemqsubscriber { /// <summary> /// interaction logic for window1.xaml /// </summary> public partial class window1 : window { private static iconnectionfactory connfac; private static iconnection connection; private static isession session; private static idestination destination; private static imessageproducer producer; private static imessageconsumer consumer; protected static itextmessage message = null; public window1() { initializecomponent(); initamq("myfirsttopic"); } private void initamq(string strtopicname) { try { connfac = new nmsconnectionfactory(new uri("activemq:failover:(tcp://localhost:61616)")); //新建连接 //connection = connfac.createconnection("oa","oa");//设置连接要用的用户名、密码 //如果你要持久“订阅”,则需要设置clientid,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息! connection.clientid = "testing listener"; connection = connfac.createconnection();//如果你是缺省方式启动active mq服务,则不需填用户名、密码 //创建session session = connection.createsession(); //发布/订阅模式,适合一对多的情况 destination = sessionutil.getdestination(session, "topic://" + strtopicname); //新建生产者对象 producer = session.createproducer(destination); producer.deliverymode = msgdeliverymode.nonpersistent;//activemq服务器停止工作后,消息不再保留 //新建消费者对象:普通“订阅”模式 //consumer = session.createconsumer(destination);//不需要持久“订阅” //新建消费者对象:持久"订阅"模式: // 持久“订阅”后,如果你的程序被停止工作后,恢复运行, //从第一次持久订阅开始,没收到的消息还可以继续收 consumer = session.createdurableconsumer( session.gettopic(strtopicname) , connection.clientid, null, false); //设置消息接收事件 consumer.listener += new messagelistener(onmessage); //启动来自active mq的消息侦听 connection.start(); } catch (exception e) { //初始化activemq连接失败,往vs2008的output窗口写入出错信息! debug.writeline(e.message); } } private void sendmsg2topic_click(object sender, routedeventargs e) { //发送消息 itextmessage request = session.createtextmessage(datetime.now.tolocaltime()+" "+tbmsg.text); producer.send(request); } protected void onmessage(imessage receivedmsg) { //接收消息 message = receivedmsg as itextmessage; //ui线程,显示收到的消息 dispatcher.invoke(dispatcherpriority.normal, new action(() => { datetime dt = new datetime(); listboxitem lbi = new listboxitem(); lbi.content = datetime.now.tolocaltime() + " " + message.text; lbr.items.add(lbi); })); } } }
队列通讯方式,消费者例子
using system; using system.collections.generic; using system.linq; using system.text; using apache.nms; using system.diagnostics; using log4net; using apache.nms.util; using system.collections; namespace cat8637autocallserver { public class smtask { public string callee { get; set; } public string checknumber { get; set; } public int deadline { get; set; } public override string tostring() { return string.format("callee={0},checknumber={1},deadline={2}", callee,checknumber,deadline); } } /* * 负责接收任务,并把任务放在任务等待队列中。 */ public class mqclient { private static readonly ilog logger = logmanager.getlogger(typeof(mqclient)); private static iconnection connection = null; private static isession session = null; queue _voicesmtasks = new queue(); public mqclient() { try { iconnectionfactory factory = new nmsconnectionfactory(new uri("activemq:failover:(tcp://localhost:61616)")); //新建连接 //connection = connfac.createconnection("oa","oa");//设置连接要用的用户名、密码 connection = factory.createconnection(); session = connection.createsession(); imessageconsumer consumer = session.createconsumer(session.getqueue("taskissue_voicesm")); consumer.listener += new messagelistener(onmessage); connection.start(); } catch (exception ex) { debug.writeline(ex.message); } } protected void onmessage(imessage receivedmsg) { imessage message = receivedmsg as itextmessage; smtask smtask = new smtask(); smtask.callee = message.properties["callee"] as string; smtask.checknumber = message.properties["message"] as string; smtask.deadline = convert.toint32(message.properties["deadline"] as string); logger.info("received: "+smtask.tostring()); lock (_voicesmtasks) { _voicesmtasks.enqueue(smtask); } } public smtask getvoicesmtask() { smtask result = null; lock (_voicesmtasks) { if (_voicesmtasks.count > 0) { result = _voicesmtasks.dequeue() as smtask; } } return result; } } }
队列通讯方式,生产者例子
private void send_click(object sender, routedeventargs e) { try { idestination destination = sessionutil.getdestination(session, "queue://taskissue_voicesm"); //新建生产者对象 imessageproducer producer = session.createproducer(destination); producer.deliverymode = msgdeliverymode.nonpersistent;//activemq服务器停止工作后,消息不再保留 itextmessage request = session.createtextmessage(); request.nmscorrelationid = "testvoicesm";//这里我填了应用程序的名称。 request.properties["callee"] = tbcallee.text; request.properties["message"] = tbchecknumber.text; request.properties["deadline"] = tbvalidduration.text; producer.send(request); } catch (exception ex) { //初始化activemq连接失败,往vs2008的output窗口写入出错信息! debug.writeline(ex.message); } } private void window_closed(object sender, eventargs e) { try { if (session == null) return; //if (connection == null) // return; session.close(); //connection.close(); } catch (exception ex) { debug.writeline(ex.message); } }
更多关于c#相关内容感兴趣的读者可查看本站专题:《c#窗体操作技巧汇总》、《c#常见控件用法教程》、《winform控件用法总结》、《c#程序设计之线程使用技巧总结》、《c#操作excel技巧总结》、《c#中xml文件操作技巧汇总》、《c#数据结构与算法教程》、《c#数组操作技巧总结》及《c#面向对象程序设计入门教程》
希望本文所述对大家c#程序设计有所帮助。