-
C#教程之C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
本站最新发布 C#从入门到精通
试听地址 https://www.xin3721.com/eschool/CSharpxin3721/
试听地址 https://www.xin3721.com/eschool/CSharpxin3721/
一、引言
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。
二、示例
2.1、环境准备
在NuGet上安装RabbitMQ.Client。
2.2、工厂类
添加一个工厂类RabbitMQFactory:

/// <summary> /// 多路复用技术(Multiplexing)目的:为了避免创建多个TCP而造成系统资源的浪费和超载,从而有效地利用TCP连接。 /// </summary> public static class RabbitMQFactory { private static IConnection sharedConnection; private static int ChannelCount { get; set; } private static readonly object _locker = new object(); public static IConnection SharedConnection { get { if (ChannelCount >= 1000) { if (sharedConnection != null && sharedConnection.IsOpen) { sharedConnection.Close(); } sharedConnection = null; ChannelCount = 0; } if (sharedConnection == null) { lock (_locker) { if (sharedConnection == null) { sharedConnection = GetConnection(); ChannelCount++; } } } return sharedConnection; } } private static IConnection GetConnection() { var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; return factory.CreateConnection(); } }
2.3、主窗体
代码如下:

public partial class RabbitMQMultithreading : Form { public delegate void ListViewDelegate<T>(T obj); public RabbitMQMultithreading() { InitializeComponent(); } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="msg"></param> private void ShowMessage(string msg) { if (InvokeRequired) { BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="format"></param> /// <param name="args"></param> private void ShowMessage(string format, params object[] args) { if (InvokeRequired) { BeginInvoke(new MethodInvoker(delegate () { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); })); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// 生产者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnSend_Click(object sender, EventArgs e) { int messageCount = 100; var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); for (int i = 1; i <= messageCount; i++) { channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); ShowMessage($"Send {message}"); } } } } /// <summary> /// 消费者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnReceive_Click(object sender, EventArgs e) { Random random = new Random(); int rallyNumber = random.Next(1, 1000); int channelCount = 0; await Task.Run(() => { try { int asyncCount = 10; List<Task<bool>> tasks = new List<Task<bool>>(); var connection = RabbitMQFactory.SharedConnection; for (int i = 1; i <= asyncCount; i++) { tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber))); } Task.WaitAll(tasks.ToArray()); string syncResultMsg = $"集结号 {rallyNumber} 已吹起号角--" + $"本次开启信道成功数:{tasks.Count(s => s.Result == true)}," + $"本次开启信道失败数:{tasks.Count() - tasks.Count(s => s.Result == true)}" + $"累计开启信道成功数:{channelCount + tasks.Count(s => s.Result == true)}"; ShowMessage(syncResultMsg); } catch (Exception ex) { ShowMessage($"集结号 {rallyNumber} 消费异常:{ex.Message}"); } }); } /// <summary> /// 异步方法 /// </summary> /// <param name="state"></param> /// <param name="rallyNumber"></param> /// <returns></returns> private bool MessageWorkItemCallback(object state, int rallyNumber) { bool syncResult = false; IModel channel = null; try { IConnection connection = state as IConnection; //不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。 channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Thread.Sleep(1000); ShowMessage($"集结号 {rallyNumber} Received {message}"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer); syncResult = true; } catch (Exception ex) { syncResult = false; ShowMessage(ex.Message); } return syncResult; } }
2.4、运行结果
多点几次消费者即可增加信道,提升消费能力。
栏目列表
最新更新
Winform中怎样跨窗体获取另一窗体的控件对
Winform中使用FastReport的PictureObject时通过代
三分钟掌握,使用Quqrtz.Net实现定时发送邮
NET/Regex 处理连续空格
QR 码详解(下)
C#中的等值判断1
C#编写了一个基于Lucene.Net的搜索引擎查询
使用FastReport报表工具生成报表PDF文档
基于JieBaNet+Lucene.Net实现全文搜索
C#取视频某一帧图片
.Net Standard(.Net Core)实现获取配置信息
Linux PXE + Kickstart 自动装机
Shell 编程 基础
Shell 编程 条件语句
CentOS8-网卡配置及详解
Linux中LVM逻辑卷管理
1.数码相框-相框框架分析(1)
Ubuntu armhf 版本国内源
Linux中raid磁盘阵列
搭建简易网站
Dubbo(五):深入理解Dubbo核心模型Invok
vfp教程之VFP与Excel交互编程
vfp教程之在VFP中实现跟变式组合框及椭圆
SQL SERVER查询数据库所有表的大小,按照记
使用 SQL 服务器时,"评估期已过期"错
sql server无法连接本地服务器
使用sql语句创建表
VB操作Access数据库小记 ————————
access数据库远程连接
java web操作Access数据库