欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#队列学习笔记:MSMQ入门二

程序员文章站 2022-08-07 18:19:57
一、引言 按照专用队列解释: MachineName\Private$\QueueName,只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。所以刚开始的时候认为,要想访问远程消息队列,只能使用公共队列。但是后来发现,公共队列依赖Domain Controller(域控),在 ......

    一、引言

    按照专用队列解释: machinename\private$\queuename,只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。所以刚开始的时候认为,要想访问远程消息队列,只能使用公共队列。但是后来发现,公共队列依赖domain controller(域控),在实际部署的时候,要求使用消息队列的应用一定要在某个域中,有些太苛刻!后来发现,私有队列也是可以远程访问的。(很困惑为什么私有队列只能本地访问,这句话,到处都能看到?!)

    二、工作组下的本地c/s

    2.1、项目建立

    新建4个项目:

C#队列学习笔记:MSMQ入门二

    2.2、项目代码

    2.2.1、model项目

    /// <summary>
    /// 消息队列实体
    /// </summary>
    [serializable]
    public class mqmessage
    {
        /// <summary>
        /// 对应message的label
        /// </summary>
        public string label { get; set; }

        /// <summary>
        /// 对应message的body,commandtype为操作类型,list<string>为操作列表。
        /// </summary>
        public dictionary<commandtype, list<string>> body { get; set; } = new dictionary<commandtype, list<string>>();

        /// <summary>
        /// 无参构造函数
        /// </summary>
        public mqmessage()
        {
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        /// <param name="label"></param>
        /// <param name="body"></param>
        public mqmessage(string label, dictionary<commandtype, list<string>> body)
        {
            label = label;
            body = body;
        }
    }

    /// <summary>
    /// 操作类型
    /// </summary>
    public enum commandtype
    {
        create = 1, //创建
        update = 2, //更新
        delete = 3  //删除
    }

    2.2.2、common项目

    /// <summary>
    /// 日志帮助类
    /// </summary>
    public static class loghelper
    {
        private static readonly string errlogsavepath = configurationmanager.appsettings["errlogsavepath"] ?? appdomain.currentdomain.basedirectory;

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="ex">异常信息</param>
        public static void writelog(exception ex)
        {
            writelog(geterrmsg(ex));
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="message">日志内容</param>
        public static void writelog(string message)
        {
            writelog(errlogsavepath, message);
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="filepath">日志文件路径</param>
        /// <param name="message">日志内容</param>
        public static void writelog(string filepath, string message)
        {
            try
            {
                if (!directory.exists(filepath))
                {
                    directory.createdirectory(filepath);
                }
                string filename = datetime.now.tostring("yyyy-mm-dd") + ".txt";
                using (streamwriter sw = new streamwriter(filepath + "\\" + filename, true))
                {
                    sw.writeline("--------------------------------------------");
                    sw.writeline($"{datetime.now.tolongtimestring()}:{datetime.now.millisecond}\t{message}");
                    sw.close();
                }
            }
            catch (exception ex)
            {
                throw new exception(geterrmsg(ex));
            }
        }

        /// <summary>
        /// 获取异常详细信息
        /// </summary>
        /// <param name="ex"></param>
        /// <returns></returns>
        private static string geterrmsg(exception ex)
        {
            string errmessage = "";
            for (exception tempexception = ex; tempexception != null; tempexception = tempexception.innerexception)
            {
                errmessage += tempexception.message + environment.newline + environment.newline;
            }
            errmessage += ex.tostring();
            return errmessage;
        }
    }
    /// <summary>
    /// 消息队列管理器
    /// </summary>
    public class mqmanager : idisposable
    {
        private messagequeue _mq = null;
        private readonly linktype linktype = linktype.localhost;    //链接类型,远程时使用linktype.remoteserver。
        private readonly string remoteserver = "192.168.2.165";     //远程服务器ip地址

        public static mqmanager linkserver { get; } = new mqmanager();

        /// <summary>
        /// 初始化函数
        /// </summary>
        /// <param name="linktype">链接类型</param>
        public void mqmanagerinit(linktype linktype)
        {
            if (_mq == null)
            {
                string _path;
                if (linktype == linktype.localhost)
                {
                    _path = @".\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld");
                }
                else
                {
                    _path = "formatname:direct=tcp:" + remoteserver + @"\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld");
                }
                _mq = new messagequeue(_path)
                {
                    formatter = new binarymessageformatter()
                };
            }
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        public mqmanager()
        {
            mqmanagerinit(linktype);
        }

        /// <summary>
        /// 发送消息队列(事务)
        /// </summary>
        /// <param name="message"></param>
        public void send(mqmessage message)
        {
            messagequeuetransaction transaction = new messagequeuetransaction();
            transaction.begin();
            _mq.send(message.body, message.label, transaction);
            transaction.commit();
        }

        /// <summary>
        /// 接收消息队列
        /// </summary>
        /// <returns></returns>
        public message receive()
        {
            message msg = null;
            try
            {
                msg = _mq.receive(new timespan(0, 0, 1));
            }
            catch (exception ex)
            {
                throw new exception(ex.message);
            }

            return msg;
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void dispose()
        {
            if (_mq != null)
            {
                _mq.close();
                _mq.dispose();
                _mq = null;
            }
        }
    }

    /// <summary>
    /// 链接类型
    /// </summary>
    public enum linktype
    {
        localhost = 1,      //本地服务器
        remoteserver = 2    //远程服务器
    }

    2.2.3、send项目

    class program
    {
        static void main(string[] args)
        {
            mqmessage mqmessage = new mqmessage();
            list<string> list = new list<string>();

            console.writeline("请输入内容按回车发送,多个内容请用英文逗号隔开,退出请输入exit。");
            string receivekey = console.readline();

            while (receivekey.tolower() != "exit")
            {
                if (receivekey.length > 0)
                {
                    mqmessage.label = guid.newguid().tostring();

                    list.clear();
                    list = receivekey.split(new char[] { ',' }).tolist();
                    mqmessage.body.clear();
                    mqmessage.body.add(commandtype.create, list);
                    try
                    {
                        mqmanager.linkserver.send(mqmessage);
                        console.writeline("内容已发送成功。");
                    }
                    catch (exception ex)
                    {
                        console.writeline(ex.message);
                        loghelper.writelog(ex);
                    }
                }
                receivekey = console.readline();
            }

            mqmanager.linkserver.dispose();
        }
    }

    2.2.4、receive项目

    /// <summary>
    /// 接收消息队列管理(线程)
    /// </summary>
    public class receivemanager : idisposable
    {
        private thread _thread = null;

        public static receivemanager instance { get; set; } = new receivemanager();

        /// <summary>
        /// 开始
        /// </summary>
        public void start()
        {
            startreceive();
        }

        /// <summary>
        /// 接收线程
        /// </summary>
        private void startreceive()
        {
            _thread = new thread(new threadstart(receive))
            {
                name = "receivethread",
                isbackground = true
            };
            _thread.start();
        }

        /// <summary>
        /// 接收线程调用方法
        /// </summary>
        private void receive()
        {
            message msg = null;
            while (true)
            {
                try
                {
                    msg = mqmanager.linkserver.receive();
                    if (msg != null)
                    {
                        console.writeline("----------------------------------------------------");
                        console.writeline("lable: " + msg.label);
                        dictionary<commandtype, list<string>> keyvaluepairs = msg.body as dictionary<commandtype, list<string>>;
                        console.writeline("body commandtype: " + keyvaluepairs.keys.first());
                        console.writeline("body details: ");
                        foreach (var item in keyvaluepairs.values.first())
                        {
                            console.writeline(item);
                        }
                        console.writeline("----------------------------------------------------");
                    }
                }
                catch (exception ex)
                {
                    console.writeline(ex.message);
                    loghelper.writelog(ex);
                }
                thread.sleep(1000);
            }
        }

        /// <summary>
        /// 结束
        /// </summary>
        public void stop()
        {
            dispose();
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void dispose()
        {
            try
            {
                if (_thread != null)
                {
                    _thread.abort();
                    _thread.join();
                    _thread = null;
                }

                mqmanager.linkserver.dispose();
            }
            catch (exception ex)
            {
                console.writeline(ex.message);
            }
        }
    }
    class program
    {
        static void main(string[] args)
        {
            receivemanager.instance.start();
            console.writeline("退出请输入exit");
            string receivekey = console.readline();
            while (receivekey.tolower() != "exit")
            {
                receivekey = console.readline();
            }
            receivemanager.instance.stop();
            console.read();
        }
    }

    2.3、运行测试

    客户端发送hello,world:

C#队列学习笔记:MSMQ入门二

    服务端接收到的信息:

C#队列学习笔记:MSMQ入门二

    三、工作组下的远程c/s

    3.1、代码调整

    工作组下的远程c/s,代码已经在上面的示例中提供,将common\mqmanager.cs下的:

    private readonly linktype linktype = linktype.localhost;改成private readonly linktype linktype = linktype.remoteserver;即可。

    3.2、访问权限

    既然要与远程服务器交互(发送/接收)队列信息,首当其冲的是访问权限问题,没有权限,一切免谈。

    下面讲一下远程服务器(代码中的192.168.2.165,win7系统)要设置的内容:

    3.2.1、在运行中输入compmgmt.msc->服务和应用程序->消息队列->右键属性->服务器安全性->禁用未经身份验证的 rpc 调用->把勾勾去掉->应用。

C#队列学习笔记:MSMQ入门二

    3.2.2、在消息队列->专用队列->新建一个代码中用到的helloworld队列,勾上事务性->确定。

C#队列学习笔记:MSMQ入门二

    为什么要手工建helloworld消息队列?因为要对这个队列进行匿名访问授权,后面会讲到。至于事务性这个勾,这个要与代码相一致。因为本示例中使用了messagequeuetransaction来发送事务信息,所以必须得勾上这个勾,不然的话,发送时没有任何的报错信息,但是服务器就是收不到队列信息。

    3.2.3、专用队列->helloworld->右键属性->安全->anonymous logon->完全控制->应用。

C#队列学习笔记:MSMQ入门二

    3.2.4、在运行中输入regedit->hkey_local_machine\software\microsoft\msmq\parameters\security->新建两个dword值:allownonauthenticatedrpc、newremotereadserverdenyworkgroupclient->分别双击将数值数据改成1。

C#队列学习笔记:MSMQ入门二

    3.2.5、关于防火墙,我是关闭了的,假如您的电脑防火墙是打开了的话,请检查一下message queuing是不是被允许的?

C#队列学习笔记:MSMQ入门二

    3.3、运行测试

    客户端发送a,b,c,d:

C#队列学习笔记:MSMQ入门二

    服务器端接收到的信息:

C#队列学习笔记:MSMQ入门二

 

    参考自: