C#队列学习笔记:MSMQ入门二
一、引言
按照专用队列解释: machinename\private$\queuename,只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。所以刚开始的时候认为,要想访问远程消息队列,只能使用公共队列。但是后来发现,公共队列依赖domain controller(域控),在实际部署的时候,要求使用消息队列的应用一定要在某个域中,有些太苛刻!后来发现,私有队列也是可以远程访问的。(很困惑为什么私有队列只能本地访问,这句话,到处都能看到?!)
二、工作组下的本地c/s
2.1、项目建立
新建4个项目:
2.2、项目代码
2.2.1、model项目
/// <summary> /// 消息队列实体 /// </summary> [serializable] public class mqmessage { /// <summary> /// 对应message的label /// </summary> public string label { get; set; } /// <summary> /// 对应message的body,commandtype为操作类型,list<string>为操作列表。 /// </summary> public dictionary<commandtype, list<string>> body { get; set; } = new dictionary<commandtype, list<string>>(); /// <summary> /// 无参构造函数 /// </summary> public mqmessage() { } /// <summary> /// 有参构造函数 /// </summary> /// <param name="label"></param> /// <param name="body"></param> public mqmessage(string label, dictionary<commandtype, list<string>> body) { label = label; body = body; } } /// <summary> /// 操作类型 /// </summary> public enum commandtype { create = 1, //创建 update = 2, //更新 delete = 3 //删除 }
2.2.2、common项目
/// <summary> /// 日志帮助类 /// </summary> public static class loghelper { private static readonly string errlogsavepath = configurationmanager.appsettings["errlogsavepath"] ?? appdomain.currentdomain.basedirectory; /// <summary> /// 异常日志方法重载 /// </summary> /// <param name="ex">异常信息</param> public static void writelog(exception ex) { writelog(geterrmsg(ex)); } /// <summary> /// 异常日志方法重载 /// </summary> /// <param name="message">日志内容</param> public static void writelog(string message) { writelog(errlogsavepath, message); } /// <summary> /// 异常日志方法重载 /// </summary> /// <param name="filepath">日志文件路径</param> /// <param name="message">日志内容</param> public static void writelog(string filepath, string message) { try { if (!directory.exists(filepath)) { directory.createdirectory(filepath); } string filename = datetime.now.tostring("yyyy-mm-dd") + ".txt"; using (streamwriter sw = new streamwriter(filepath + "\\" + filename, true)) { sw.writeline("--------------------------------------------"); sw.writeline($"{datetime.now.tolongtimestring()}:{datetime.now.millisecond}\t{message}"); sw.close(); } } catch (exception ex) { throw new exception(geterrmsg(ex)); } } /// <summary> /// 获取异常详细信息 /// </summary> /// <param name="ex"></param> /// <returns></returns> private static string geterrmsg(exception ex) { string errmessage = ""; for (exception tempexception = ex; tempexception != null; tempexception = tempexception.innerexception) { errmessage += tempexception.message + environment.newline + environment.newline; } errmessage += ex.tostring(); return errmessage; } }
/// <summary> /// 消息队列管理器 /// </summary> public class mqmanager : idisposable { private messagequeue _mq = null; private readonly linktype linktype = linktype.localhost; //链接类型,远程时使用linktype.remoteserver。 private readonly string remoteserver = "192.168.2.165"; //远程服务器ip地址 public static mqmanager linkserver { get; } = new mqmanager(); /// <summary> /// 初始化函数 /// </summary> /// <param name="linktype">链接类型</param> public void mqmanagerinit(linktype linktype) { if (_mq == null) { string _path; if (linktype == linktype.localhost) { _path = @".\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld"); } else { _path = "formatname:direct=tcp:" + remoteserver + @"\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld"); } _mq = new messagequeue(_path) { formatter = new binarymessageformatter() }; } } /// <summary> /// 有参构造函数 /// </summary> public mqmanager() { mqmanagerinit(linktype); } /// <summary> /// 发送消息队列(事务) /// </summary> /// <param name="message"></param> public void send(mqmessage message) { messagequeuetransaction transaction = new messagequeuetransaction(); transaction.begin(); _mq.send(message.body, message.label, transaction); transaction.commit(); } /// <summary> /// 接收消息队列 /// </summary> /// <returns></returns> public message receive() { message msg = null; try { msg = _mq.receive(new timespan(0, 0, 1)); } catch (exception ex) { throw new exception(ex.message); } return msg; } /// <summary> /// 释放资源 /// </summary> public void dispose() { if (_mq != null) { _mq.close(); _mq.dispose(); _mq = null; } } } /// <summary> /// 链接类型 /// </summary> public enum linktype { localhost = 1, //本地服务器 remoteserver = 2 //远程服务器 }
2.2.3、send项目
class program { static void main(string[] args) { mqmessage mqmessage = new mqmessage(); list<string> list = new list<string>(); console.writeline("请输入内容按回车发送,多个内容请用英文逗号隔开,退出请输入exit。"); string receivekey = console.readline(); while (receivekey.tolower() != "exit") { if (receivekey.length > 0) { mqmessage.label = guid.newguid().tostring(); list.clear(); list = receivekey.split(new char[] { ',' }).tolist(); mqmessage.body.clear(); mqmessage.body.add(commandtype.create, list); try { mqmanager.linkserver.send(mqmessage); console.writeline("内容已发送成功。"); } catch (exception ex) { console.writeline(ex.message); loghelper.writelog(ex); } } receivekey = console.readline(); } mqmanager.linkserver.dispose(); } }
2.2.4、receive项目
/// <summary> /// 接收消息队列管理(线程) /// </summary> public class receivemanager : idisposable { private thread _thread = null; public static receivemanager instance { get; set; } = new receivemanager(); /// <summary> /// 开始 /// </summary> public void start() { startreceive(); } /// <summary> /// 接收线程 /// </summary> private void startreceive() { _thread = new thread(new threadstart(receive)) { name = "receivethread", isbackground = true }; _thread.start(); } /// <summary> /// 接收线程调用方法 /// </summary> private void receive() { message msg = null; while (true) { try { msg = mqmanager.linkserver.receive(); if (msg != null) { console.writeline("----------------------------------------------------"); console.writeline("lable: " + msg.label); dictionary<commandtype, list<string>> keyvaluepairs = msg.body as dictionary<commandtype, list<string>>; console.writeline("body commandtype: " + keyvaluepairs.keys.first()); console.writeline("body details: "); foreach (var item in keyvaluepairs.values.first()) { console.writeline(item); } console.writeline("----------------------------------------------------"); } } catch (exception ex) { console.writeline(ex.message); loghelper.writelog(ex); } thread.sleep(1000); } } /// <summary> /// 结束 /// </summary> public void stop() { dispose(); } /// <summary> /// 释放资源 /// </summary> public void dispose() { try { if (_thread != null) { _thread.abort(); _thread.join(); _thread = null; } mqmanager.linkserver.dispose(); } catch (exception ex) { console.writeline(ex.message); } } }
class program { static void main(string[] args) { receivemanager.instance.start(); console.writeline("退出请输入exit"); string receivekey = console.readline(); while (receivekey.tolower() != "exit") { receivekey = console.readline(); } receivemanager.instance.stop(); console.read(); } }
2.3、运行测试
客户端发送hello,world:
服务端接收到的信息:
三、工作组下的远程c/s
3.1、代码调整
工作组下的远程c/s,代码已经在上面的示例中提供,将common\mqmanager.cs下的:
private readonly linktype linktype = linktype.localhost;改成private readonly linktype linktype = linktype.remoteserver;即可。
3.2、访问权限
既然要与远程服务器交互(发送/接收)队列信息,首当其冲的是访问权限问题,没有权限,一切免谈。
下面讲一下远程服务器(代码中的192.168.2.165,win7系统)要设置的内容:
3.2.1、在运行中输入compmgmt.msc->服务和应用程序->消息队列->右键属性->服务器安全性->禁用未经身份验证的 rpc 调用->把勾勾去掉->应用。
3.2.2、在消息队列->专用队列->新建一个代码中用到的helloworld队列,勾上事务性->确定。
为什么要手工建helloworld消息队列?因为要对这个队列进行匿名访问授权,后面会讲到。至于事务性这个勾,这个要与代码相一致。因为本示例中使用了messagequeuetransaction来发送事务信息,所以必须得勾上这个勾,不然的话,发送时没有任何的报错信息,但是服务器就是收不到队列信息。
3.2.3、专用队列->helloworld->右键属性->安全->anonymous logon->完全控制->应用。
3.2.4、在运行中输入regedit->hkey_local_machine\software\microsoft\msmq\parameters\security->新建两个dword值:allownonauthenticatedrpc、newremotereadserverdenyworkgroupclient->分别双击将数值数据改成1。
3.2.5、关于防火墙,我是关闭了的,假如您的电脑防火墙是打开了的话,请检查一下message queuing是不是被允许的?
3.3、运行测试
客户端发送a,b,c,d:
服务器端接收到的信息:
参考自:
上一篇: MongoDB中的参数限制与阀值详析