-
C#教程之C#队列学习笔记:RabbitMQ安装及使用(2)
本站最新发布 C#从入门到精通
试听地址 https://www.xin3721.com/eschool/CSharpxin3721/
void Main(string[] args) { #region Hello World //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定basicProperties) channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
试听地址 https://www.xin3721.com/eschool/CSharpxin3721/
void Main(string[] args) { #region Hello World //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定basicProperties) channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }

class Program { static void Main(string[] args) { #region Hello World //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //5.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //8.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
四、Exchange
上面的示例,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。
那消费者如何才能发送消息到多个消息队列呢?
RabbitMQ提供了Exchange,它类似于路由器的功能,对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但是Exchange是如何知道将消息附加到哪个队列或者直接忽略的呢?这些其实是由Exchange Type来定义的。关于Exchange的图文介绍,请看上一篇《C#队列学习笔记:RabbitMQ基础知识》,此处仅提供示例代码。
4.1、fanout

class Program { static void Main(string[] args) { #region fanout exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;fanout类型无需指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }

class Program { static void Main(string[] args) { #region fanout exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定fanout类型exchange,fanout类型无需指定routingKey。 channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: ""); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
4.2、direct

class Program { static void Main(string[] args) { #region direct exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;direct类型必须指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }

class Program { static void Main(string[] args) { #region direct exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定direct类型exchange,direct类型必须指定routingKey。 channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
4.3、topic

class Program { static void Main(string[] args) { #region topic exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;topic类型必须指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }

class Program { static void Main(string[] args) { #region topic exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定topic类型exchange,topic类型必须指定routingKey。 channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
五、RPC
RPC--Remote Procedure Call,远程过程调用。RabbitMQ是如何进行远程调用的呢?示意图如下:
第一步:主要是进行远程调用的客户端需要指定接收远程回调的队列,并声明消费者监听此队列。
第二步:远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听结果的队列中去。

栏目列表
最新更新
C# 面向对象
假设客车的座位数是9行4列,使用二维数
C#基于接口设计三层架构Unity篇
C#线程 入门
C#读取静态类常量属性和值
C# 插件式编程
C# 委托与事件有啥区别?
C#队列学习笔记:队列(Queue)和堆栈(Stack
linq 多表分组左连接查询查询统计
C#队列学习笔记:MSMQ入门一
C# 在Word中添加Latex 数学公式和符号
inncheck命令 – 检查语法
基于UDP的服务器端和客户端
再谈UDP和TCP
在socket编程中使用域名
网络数据传输时的大小端问题
socket编程实现文件传输功能
如何优雅地断开TCP连接?
图解TCP四次握手断开连接
详细分析TCP数据的传输过程
SqlServer 利用游标批量更新数据
BOS只读状态修改
SQL Server等待事件—PAGEIOLATCH_EX
数据库多行转换为单一列
获取数据表最后最后访问,修改,更新,
计算经历的时间
SQL查询结果自定义排序
修改数据库默认位置
日期简单加或减
从日期获取年,月或日