VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • C#队列学习笔记:MSMQ入门二

一、引言

    按照专用队列解释: MachineName\Private$\QueueName,只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。所以刚开始的时候认为,要想访问远程消息队列,只能使用公共队列。但是后来发现,公共队列依赖Domain Controller(域控),在实际部署的时候,要求使用消息队列的应用一定要在某个域中,有些太苛刻!后来发现,私有队列也是可以远程访问的。(很困惑为什么私有队列只能本地访问,这句话,到处都能看到?!)

    二、工作组下的本地C/S

    2.1、项目建立

    新建4个项目:

    2.2、项目代码

    2.2.1、Model项目

复制代码
    /// <summary>
    /// 消息队列实体
    /// </summary>
    [Serializable]
    public class MqMessage
    {
        /// <summary>
        /// 对应Message的Label
        /// </summary>
        public string Label { get; set; }

        /// <summary>
        /// 对应Message的Body,CommandType为操作类型,List<string>为操作列表。
        /// </summary>
        public Dictionary<CommandType, List<string>> Body { get; set; } = new Dictionary<CommandType, List<string>>();

        /// <summary>
        /// 无参构造函数
        /// </summary>
        public MqMessage()
        {
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        /// <param name="label"></param>
        /// <param name="body"></param>
        public MqMessage(string label, Dictionary<CommandType, List<string>> body)
        {
            Label = label;
            Body = body;
        }
    }

    /// <summary>
    /// 操作类型
    /// </summary>
    public enum CommandType
    {
        Create = 1, //创建
        Update = 2, //更新
        Delete = 3  //删除
    }
复制代码

    2.2.2、Common项目

复制代码
    /// <summary>
    /// 日志帮助类
    /// </summary>
    public static class LogHelper
    {
        private static readonly string errLogSavePath = ConfigurationManager.AppSettings["ErrLogSavePath"] ?? AppDomain.CurrentDomain.BaseDirectory;

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="ex">异常信息</param>
        public static void WriteLog(Exception ex)
        {
            WriteLog(GetErrMsg(ex));
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="message">日志内容</param>
        public static void WriteLog(string message)
        {
            WriteLog(errLogSavePath, message);
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="filepath">日志文件路径</param>
        /// <param name="message">日志内容</param>
        public static void WriteLog(string filepath, string message)
        {
            try
            {
                if (!Directory.Exists(filepath))
                {
                    Directory.CreateDirectory(filepath);
                }
                string filename = DateTime.Now.ToString("yyyy-MM-dd") + ".txt";
                using (StreamWriter sw = new StreamWriter(filepath + "\\" + filename, true))
                {
                    sw.WriteLine("--------------------------------------------");
                    sw.WriteLine($"{DateTime.Now.ToLongTimeString()}:{DateTime.Now.Millisecond}\t{message}");
                    sw.Close();
                }
            }
            catch (Exception ex)
            {
                throw new Exception(GetErrMsg(ex));
            }
        }

        /// <summary>
        /// 获取异常详细信息
        /// </summary>
        /// <param name="ex"></param>
        /// <returns></returns>
        private static string GetErrMsg(Exception ex)
        {
            string errMessage = "";
            for (Exception tempException = ex; tempException != null; tempException = tempException.InnerException)
            {
                errMessage += tempException.Message + Environment.NewLine + Environment.NewLine;
            }
            errMessage += ex.ToString();
            return errMessage;
        }
    }
复制代码
复制代码
    /// <summary>
    /// 消息队列管理器
    /// </summary>
    public class MqManager : IDisposable
    {
        private MessageQueue _mq = null;
        private readonly LinkType linkType = LinkType.LocalHost;    //链接类型,远程时使用LinkType.RemoteServer。
        private readonly string remoteServer = "192.168.2.165";     //远程服务器IP地址

        public static MqManager LinkServer { get; } = new MqManager();

        /// <summary>
        /// 初始化函数
        /// </summary>
        /// <param name="linkType">链接类型</param>
        public void MqManagerInit(LinkType linkType)
        {
            if (_mq == null)
            {
                string _path;
                if (linkType == LinkType.LocalHost)
                {
                    _path = @".\private$\" + (ConfigurationManager.AppSettings["MSMQName"] ?? "HelloWorld");
                }
                else
                {
                    _path = "FormatName:DIRECT=TCP:" + remoteServer + @"\private$\" + (ConfigurationManager.AppSettings["MSMQName"] ?? "HelloWorld");
                }
                _mq = new MessageQueue(_path)
                {
                    Formatter = new BinaryMessageFormatter()
                };
            }
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        public MqManager()
        {
            MqManagerInit(linkType);
        }

        /// <summary>
        /// 发送消息队列(事务)
        /// </summary>
        /// <param name="message"></param>
        public void Send(MqMessage message)
        {
            MessageQueueTransaction transaction = new MessageQueueTransaction();
            transaction.Begin();
            _mq.Send(message.Body, message.Label, transaction);
            transaction.Commit();
        }

        /// <summary>
        /// 接收消息队列
        /// </summary>
        /// <returns></returns>
        public Message Receive()
        {
            Message msg = null;
            try
            {
                msg = _mq.Receive(new TimeSpan(0, 0, 1));
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }

            return msg;
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (_mq != null)
            {
                _mq.Close();
                _mq.Dispose();
                _mq = null;
            }
        }
    }

    /// <summary>
    /// 链接类型
    /// </summary>
    public enum LinkType
    {
        LocalHost = 1,      //本地服务器
        RemoteServer = 2    //远程服务器
    }
复制代码

    2.2.3、Send项目

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            MqMessage mqMessage = new MqMessage();
            List<string> list = new List<string>();

            Console.WriteLine("请输入内容按回车发送,多个内容请用英文逗号隔开,退出请输入Exit。");
            string receiveKey = Console.ReadLine();

            while (receiveKey.ToLower() != "exit")
            {
                if (receiveKey.Length > 0)
                {
                    mqMessage.Label = Guid.NewGuid().ToString();

                    list.Clear();
                    list = receiveKey.Split(new char[] { ',' }).ToList();
                    mqMessage.Body.Clear();
                    mqMessage.Body.Add(CommandType.Create, list);
                    try
                    {
                        MqManager.LinkServer.Send(mqMessage);
                        Console.WriteLine("内容已发送成功。");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        LogHelper.WriteLog(ex);
                    }
                }
                receiveKey = Console.ReadLine();
            }

            MqManager.LinkServer.Dispose();
        }
    }
复制代码

    2.2.4、Receive项目

复制代码
    /// <summary>
    /// 接收消息队列管理(线程)
    /// </summary>
    public class ReceiveManager : IDisposable
    {
        private Thread _thread = null;

        public static ReceiveManager Instance { get; set; } = new ReceiveManager();

        /// <summary>
        /// 开始
        /// </summary>
        public void Start()
        {
            StartReceive();
        }

        /// <summary>
        /// 接收线程
        /// </summary>
        private void StartReceive()
        {
            _thread = new Thread(new ThreadStart(Receive))
            {
                Name = "ReceiveThread",
                IsBackground = true
            };
            _thread.Start();
        }

        /// <summary>
        /// 接收线程调用方法
        /// </summary>
        private void Receive()
        {
            Message msg = null;
            while (true)
            {
                try
                {
                    msg = MqManager.LinkServer.Receive();
                    if (msg != null)
                    {
                        Console.WriteLine("----------------------------------------------------");
                        Console.WriteLine("Lable: " + msg.Label);
                        Dictionary<CommandType, List<string>> keyValuePairs = msg.Body as Dictionary<CommandType, List<string>>;
                        Console.WriteLine("Body CommandType: " + keyValuePairs.Keys.First());
                        Console.WriteLine("Body Details: ");
                        foreach (var item in keyValuePairs.Values.First())
                        {
                            Console.WriteLine(item);
                        }
                        Console.WriteLine("----------------------------------------------------");
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    LogHelper.WriteLog(ex);
                }
                Thread.Sleep(1000);
            }
        }

        /// <summary>
        /// 结束
        /// </summary>
        public void Stop()
        {
            Dispose();
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            try
            {
                if (_thread != null)
                {
                    _thread.Abort();
                    _thread.Join();
                    _thread = null;
                }

                MqManager.LinkServer.Dispose();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
复制代码
复制代码
    class Program
    {
        static void Main(string[] args)
        {
            ReceiveManager.Instance.Start();
            Console.WriteLine("退出请输入Exit");
            string receiveKey = Console.ReadLine();
            while (receiveKey.ToLower() != "exit")
            {
                receiveKey = Console.ReadLine();
            }
            ReceiveManager.Instance.Stop();
            Console.Read();
        }
    }
复制代码

    2.3、运行测试

    客户端发送hello,world:

    服务端接收到的信息:

    三、工作组下的远程C/S

    3.1、代码调整

    工作组下的远程C/S,代码已经在上面的示例中提供,将Common\MqManager.cs下的:

    private readonly LinkType linkType = LinkType.LocalHost;改成private readonly LinkType linkType = LinkType.RemoteServer;即可。

    3.2、访问权限

    既然要与远程服务器交互(发送/接收)队列信息,首当其冲的是访问权限问题,没有权限,一切免谈。

    下面讲一下远程服务器(代码中的192.168.2.165,Win7系统)要设置的内容:

    3.2.1、在运行中输入compmgmt.msc->服务和应用程序->消息队列->右键属性->服务器安全性->禁用未经身份验证的 RPC 调用->把勾勾去掉->应用。

    3.2.2、在消息队列->专用队列->新建一个代码中用到的HelloWorld队列,勾上事务性->确定。

    为什么要手工建HelloWorld消息队列?因为要对这个队列进行匿名访问授权,后面会讲到。至于事务性这个勾,这个要与代码相一致。因为本示例中使用了MessageQueueTransaction来发送事务信息,所以必须得勾上这个勾,不然的话,发送时没有任何的报错信息,但是服务器就是收不到队列信息。

    3.2.3、专用队列->HelloWorld->右键属性->安全->ANONYMOUS LOGON->完全控制->应用。

    3.2.4、在运行中输入regedit->HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSMQ\Parameters\security->新建两个DWORD值:AllowNonauthenticatedRpc、NewRemoteReadServerDenyWorkgroupClient->分别双击将数值数据改成1。

    3.2.5、关于防火墙,我是关闭了的,假如您的电脑防火墙是打开了的话,请检查一下Message Queuing是不是被允许的?

    3.3、运行测试

    客户端发送A,B,C,D:

    服务器端接收到的信息:


相关教程