VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > c#编程 >
  • C#网络编程之消息队列(RabbitMQ、Kafka、Redis Pub/Sub)

第18章 微服务架构基础
18.3 消息队列(RabbitMQ、Kafka、Redis Pub/Sub)
一、我踩过的消息队列坑:从丢失订单到削峰填谷
刚做电商系统时,为了图快用Redis Pub/Sub做订单通知——结果Redis重启后,未消费的消息全丢了,导致100多笔订单用户没收到通知,赔了不少优惠券;后来换成RabbitMQ,又没设置消息持久化,服务器崩溃后消息再次丢失;再后来用Kafka做日志收集,没设置副本,Broker挂了整个集群不可用,日志断了3小时。踩过这些坑后,我才明白:消息队列不是“拿来就能用”的工具,必须搞懂每个组件的特性、适用场景和最佳实践。这节vb.net教程C#教程python教程SQL教程access 2010教程我把自己从“消息队列小白”到“实战专家”的血泪史揉进去,用大白话讲透RabbitMQ、Kafka、Redis Pub/Sub的核心原理,结合C#实战代码逐行讲解,以及常见坑和最佳实践,让你的消息队列既可靠又高效。
二、先搞懂消息队列的核心价值:解耦、异步、削峰
消息队列的核心是“异步通信”——生产者发送消息到队列,消费者从队列取消息处理,生产者和消费者不需要直接通信。它解决了微服务的三个核心痛点:

  1. 解耦:像“快递驿站”,收发双方不用见面
    问题:订单服务完成后,需要通知库存服务、支付服务、短信服务——如果直接调用,订单服务和其他服务耦合度高,任何一个服务改接口,订单服务都要改。
    解决:订单服务发送消息到队列,其他服务从队列取消息,订单服务不用关心谁来消费,其他服务也不用关心消息从哪来。
  2. 异步:像“外卖点餐”,不用等餐做好就能做别的
    问题:用户下单后,订单服务要调用库存扣减、支付验证、短信通知——如果同步调用,整个流程需要3秒,用户体验差。
    解决:订单服务发送消息到队列,直接返回用户“下单成功”,其他服务异步处理,用户等待时间缩短到0.5秒以内。
  3. 削峰:像“电影院检票”,把人流分散到不同时间
    问题:大促时,每秒1000个下单请求,订单服务处理不过来,直接崩溃。
    解决:消息队列把请求存起来,消费者慢慢处理,比如每秒处理200个,避免服务崩溃。
    三、RabbitMQ:可靠消息传递的“快递驿站”
    RabbitMQ是基于AMQP协议的消息队列,核心特性是可靠消息传递——支持持久化、确认机制、死信队列、复杂路由,适合关键业务场景(比如订单、支付),确保消息不丢失。
    核心概念(拓展知识)
    生产者(Producer):发送消息的服务,比如订单服务;
    消费者(Consumer):处理消息的服务,比如库存服务;
    交换机(Exchange):接收生产者的消息,根据路由规则转发到队列;
    队列(Queue):存储消息,消费者从队列取消息;
    绑定(Binding):把交换机和队列绑定,指定路由规则;
    确认机制:生产者确认消息发送到交换机/队列,消费者确认消息处理完成。
    RabbitMQ的交换机类型:
    | 交换机类型 | 路由规则 | 适用场景 |
    | ---- | ---- | ---- |
    | Fanout | 把消息转发到所有绑定的队列 | 发布订阅(比如广播通知) |
    | Direct | 按路由键(Routing Key)精确匹配 | 路由(比如订单状态变更) |
    | Topic | 按路由键的模糊匹配(*匹配一个词,#匹配多个词) | 复杂路由(比如按地区、类型转发) |
    | Headers | 按消息头匹配(很少用) | 特殊场景 |

实战1:RabbitMQ可靠消息传递(C#实战)
用RabbitMQ.Client库实现订单消息的可靠传递,包括持久化、生产者确认、消费者确认、死信队列。
步骤1:安装RabbitMQ.Client NuGet包
bash
Install-Package RabbitMQ.Client
步骤2:生产者发送可靠消息
csharp

	using RabbitMQ.Client;
	using System.Text;
	
	namespace RabbitMQProducer;
	
	class Program
	{
	static void Main(string[] args)
	{
	// 1. 创建连接工厂
	var factory = new ConnectionFactory
	{
	HostName = "localhost", // RabbitMQ服务器地址
	UserName = "guest",
	Password = "guest",
	VirtualHost = "/",
	// 启用生产者确认机制
	PublisherConfirms = true,
	// 启用生产者返回机制(消息无法路由时返回)
	PublisherReturns = true
	};
	
	using var connection = factory.CreateConnection();
	using var channel = connection.CreateModel();
	
	// 2. 声明死信交换机和死信队列(处理消费失败的消息)
	var dlxExchange = "order.dlx.exchange";
	var dlxQueue = "order.dlx.queue";
	channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, durable: true);
	channel.QueueDeclare(dlxQueue, durable: true, exclusive: false, autoDelete: false);
	channel.QueueBind(dlxQueue, dlxExchange, routingKey: "dlx.order");
	
	// 3. 声明订单交换机和队列,绑定死信队列
	var exchange = "order.exchange";
	var queue = "order.queue";
	var routingKey = "order.create";
	
	// 声明交换机(持久化)
	channel.ExchangeDeclare(exchange, ExchangeType.Direct, durable: true);
	// 声明队列(持久化),指定死信交换机和死信路由键
	var queueArgs = new Dictionary<string, object>
	{
	{ "x-dead-letter-exchange", dlxExchange },
	{ "x-dead-letter-routing-key", "dlx.order" },
	{ "x-message-ttl", 60000 } // 消息超时时间(60秒),超时后进入死信队列
	};
	channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false, arguments: queueArgs);
	// 绑定交换机和队列
	channel.QueueBind(queue, exchange, routingKey);
	
	// 4. 启用生产者确认和返回
	channel.BasicAcks += (sender, e) =>
	{
	Console.WriteLine($"消息已确认:DeliveryTag={e.DeliveryTag}, Multiple={e.Multiple}");
	};
	channel.BasicNacks += (sender, e) =>
	{
	Console.WriteLine($"消息未确认:DeliveryTag={e.DeliveryTag}, Multiple={e.Multiple}, Requeue={e.Requeue}");
	// 这里可以实现消息重试逻辑
	};
	channel.BasicReturn += (sender, e) =>
	{
	Console.WriteLine($"消息无法路由:RoutingKey={e.RoutingKey}, ReplyCode={e.ReplyCode}, ReplyText={e.ReplyText}");
	};
	
	// 5. 发送消息(持久化)
	var message = "订单ID:123456,用户ID:789";
	var body = Encoding.UTF8.GetBytes(message);
	var properties = channel.CreateBasicProperties();
	properties.Persistent = true; // 消息持久化,RabbitMQ重启后消息不丢失
	properties.Expiration = "60000"; // 消息超时时间(和队列的TTL取最小值)
	
	// 发送消息,mandatory=true表示消息无法路由时返回给生产者
	channel.BasicPublish(exchange, routingKey, mandatory: true, properties, body);
	Console.WriteLine($"发送消息:{message}");
	
	Console.ReadLine();
	}
	}

代码逐行讲解:
1.ConnectionFactory:配置RabbitMQ连接信息,启用PublisherConfirms和PublisherReturns,确保消息发送成功;
2.死信队列:处理消费失败或超时的消息——比如消费者处理消息时抛出异常,消息会进入死信队列,后续可以人工处理或重试;
3.持久化设置:交换机、队列、消息都设置durable: true,RabbitMQ重启后消息不丢失;
4.生产者确认:BasicAcks事件表示消息已被RabbitMQ接收,BasicNacks表示消息未被接收,BasicReturn表示消息无法路由;
5.mandatory=true:消息无法路由时,RabbitMQ会把消息返回给生产者,而不是直接丢弃。
我踩过的坑:一开始没设置Persistent=true,RabbitMQ重启后消息全丢了——必须同时设置交换机、队列、消息的持久化,才能确保消息不丢失!
步骤3:消费者可靠消费消息
csharp

	using RabbitMQ.Client;
	using RabbitMQ.Client.Events;
	using System.Text;
	
	namespace RabbitMQConsumer;
	
	class Program
	{
	static void Main(string[] args)
	{
	var factory = new ConnectionFactory
	{
	HostName = "localhost",
	UserName = "guest",
	Password = "guest"
	};
	
	using var connection = factory.CreateConnection();
	using var channel = connection.CreateModel();
	
	var queue = "order.queue";
	// 配置消费者:每次取1条消息,处理完成后再取下一条
	channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
	
	var consumer = new EventingBasicConsumer(channel);
	consumer.Received += (sender, e) =>
	{
	var body = e.Body.ToArray();
	var message = Encoding.UTF8.GetString(body);
	Console.WriteLine($"收到消息:{message}");
	
	try
	{
	// 模拟处理消息:比如扣减库存
	if (message.Contains("123456"))
	{
	// 模拟处理失败,抛出异常,消息进入死信队列
	throw new Exception("库存不足");
	}
	// 处理成功,手动确认消息
	channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
	Console.WriteLine("消息处理完成,已确认");
	}
	catch (Exception ex)
	{
	Console.WriteLine($"消息处理失败:{ex.Message}");
	// 处理失败,拒绝消息,不重新入队(进入死信队列)
	channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false);
	}
	};
	
	// 手动确认模式:autoAck=false,必须手动调用BasicAck或BasicNack
	channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
	Console.WriteLine("消费者已启动,等待消息...");
	Console.ReadLine();
	}
	}

代码逐行讲解:
1.BasicQos:设置每次取1条消息,处理完成后再取下一条——避免消费者一次性取太多消息,导致内存溢出,也实现了负载均衡(多个消费者时,每个消费者处理1条消息);
2.EventingBasicConsumer:事件驱动的消费者,Received事件表示收到消息;
3.手动确认:autoAck=false,必须手动调用BasicAck(处理成功)或BasicNack(处理失败)——如果不确认,RabbitMQ会认为消息未处理,重新发给其他消费者;
4.BasicNack(requeue: false):处理失败时,拒绝消息并不重新入队,消息进入死信队列——如果requeue: true,消息会重新入队,可能导致无限循环。
我踩过的坑:一开始用autoAck=true,消费者处理消息时崩溃,消息直接丢失——必须用手动确认模式,确保消息处理完成后才从队列删除!
四、Kafka:高吞吐量的“数据高速公路”
Kafka是基于分布式日志的消息队列,核心特性是高吞吐量——每秒可以处理百万级消息,适合大数据场景(比如日志收集、实时监控、大数据分析)。它的设计目标是“以最快的速度传递数据”,牺牲了部分灵活性(比如复杂路由),但换来了极致的性能。
核心概念(拓展知识)
Producer:生产者,发送消息到Topic;
Consumer:消费者,从Topic取消息处理;
Broker:Kafka服务器,存储消息;
Topic:消息的分类,类似RabbitMQ的队列,但Topic是分布式的,分成多个Partition;
Partition:Topic的分区,每个Partition是一个有序的日志文件——Kafka通过分区实现高吞吐量,多个消费者可以同时消费不同的Partition;
副本(Replica):每个Partition有多个副本,一个Leader副本(处理读写),多个Follower副本(同步数据)——Leader挂了,Follower自动切换为Leader,确保高可用;
消费者组(Consumer Group):多个消费者组成一个组,每个Partition只能被组内一个消费者消费——实现负载均衡,避免重复消费。
Kafka的核心优势:
1.高吞吐量:每秒处理百万级消息,适合大数据场景;
2.持久化:消息存在磁盘,支持按时间或大小保留;
3.高可用:副本机制,Broker挂了不影响服务;
4.顺序消费:每个Partition内的消息是有序的,适合需要顺序处理的场景(比如订单状态变更)。
实战2:Kafka高吞吐量消息传递(C#实战)
用Confluent.Kafka库实现生产者发送消息、消费者消费消息,包括分区、副本、消费者组配置。
步骤1:安装Confluent.Kafka NuGet包
bash
Install-Package Confluent.Kafka
步骤2:生产者发送消息(带分区和回调)
csharp

	using Confluent.Kafka;
	using System;
	
	namespace KafkaProducer;
	
	class Program
	{
	static void Main(string[] args)
	{
	// 1. 生产者配置
	var config = new ProducerConfig
	{
	BootstrapServers = "localhost:9092", // Kafka Broker地址
	Acks = Acks.All, // 所有副本确认后才返回成功,确保消息不丢失
	Retries = 3, // 发送失败重试3次
	BatchSize = 16384, // 批量发送的大小(16KB)
	LingerMs = 5, // 等待5毫秒,攒够批量再发送,提高吞吐量
	CompressionType = CompressionType.Gzip, // 启用Gzip压缩,减少网络传输
	ClientId = "order-producer-1" // 生产者ID,用于监控
	};
	
	// 2. 创建生产者
	using var producer = new ProducerBuilder<Null, string>(config).Build();
	
	try
	{
	// 3. 发送消息到Topic,指定分区键(订单ID)
	var topic = "order-topic";
	var message = "订单ID:123456,用户ID:789";
	// 分区键相同的消息会发送到同一个Partition,保证顺序消费
	var deliveryResult = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }, new PartitionKey("123456")).Result;
	
	Console.WriteLine($"发送消息成功:Topic={deliveryResult.Topic}, Partition={deliveryResult.Partition}, Offset={deliveryResult.Offset}, Timestamp={deliveryResult.Timestamp}");
	}
	catch (ProduceException<Null, string> ex)
	{
	Console.WriteLine($"发送消息失败:{ex.Error.Reason}");
	}
	
	// 4. 刷新消息,确保所有消息都发送成功
	producer.Flush(TimeSpan.FromSeconds(10));
	}
	}

代码逐行讲解:
1.ProducerConfig:配置Kafka生产者,Acks=All表示所有副本确认后才返回成功,确保消息不丢失;Retries=3表示发送失败重试3次;
2.PartitionKey:指定分区键,相同分区键的消息会发送到同一个Partition,保证顺序消费——比如同一个订单的状态变更消息,会发送到同一个Partition,消费者可以按顺序处理;
3.ProduceAsync:异步发送消息,返回DeliveryResult,包含Topic、Partition、Offset等信息;
4.Flush:刷新消息,确保所有消息都发送到Kafka,避免程序退出时消息丢失。
我踩过的坑:一开始没设置Acks=All,Broker挂了消息丢失——关键业务必须设置Acks=All,牺牲一点性能换可靠性!
步骤3:消费者消费消息(消费者组+批量消费)
csharp

	using Confluent.Kafka;
	using System;
	using System.Threading;
	
	namespace KafkaConsumer;
	
	class Program
	{
	static void Main(string[] args)
	{
	// 1. 消费者配置
	var config = new ConsumerConfig
	{
	BootstrapServers = "localhost:9092",
	GroupId = "order-consumer-group", // 消费者组ID,相同组的消费者共享Partition
	AutoOffsetReset = AutoOffsetReset.Earliest, // 没有偏移量时,从最早的消息开始消费
	EnableAutoCommit = false, // 手动提交偏移量,确保消息处理完成后再提交
	FetchMinBytes = 1024, // 批量拉取的最小大小(1KB)
	FetchMaxWaitMs = 500, // 等待500毫秒,攒够批量再拉取,提高吞吐量
	ClientId = "order-consumer-1"
	};
	
	// 2. 创建消费者
	using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
	// 订阅Topic
	consumer.Subscribe("order-topic");
	
	var cancellationTokenSource = new CancellationTokenSource();
	Console.CancelKeyPress += (sender, e) =>
	{
	e.Cancel = true;
	cancellationTokenSource.Cancel();
	};
	
	try
	{
	while (!cancellationTokenSource.Token.IsCancellationRequested)
	{
	try
	{
	// 3. 批量拉取消息,最多拉取100条,等待1秒
	var consumeResult = consumer.Consume(cancellationTokenSource.Token);
	var message = consumeResult.Message.Value;
	Console.WriteLine($"收到消息:Topic={consumeResult.Topic}, Partition={consumeResult.Partition}, Offset={consumeResult.Offset}, Message={message}");
	
	// 模拟处理消息
	Thread.Sleep(100);
	
	// 4. 手动提交偏移量,确保消息处理完成后再提交
	consumer.Commit(consumeResult);
	Console.WriteLine($"提交偏移量:Partition={consumeResult.Partition}, Offset={consumeResult.Offset}");
	}
	catch (ConsumeException ex)
	{
	Console.WriteLine($"消费消息失败:{ex.Error.Reason}");
	}
	}
	}
	finally
	{
	consumer.Close();
	}
	}
	}

代码逐行讲解:
1.GroupId:消费者组ID,相同组的消费者共享Partition——比如2个消费者在同一个组,Topic有2个Partition,每个消费者消费1个Partition,实现负载均衡;
2.AutoOffsetReset=Earliest:没有偏移量时,从最早的消息开始消费——避免新消费者错过历史消息;
3.EnableAutoCommit=false:手动提交偏移量,确保消息处理完成后再提交——如果不提交,消费者重启后会重新消费之前的消息;
4.Consume:拉取消息,默认批量拉取,提高吞吐量——也可以用ConsumeBatch拉取多个消息。
我踩过的坑:一开始用EnableAutoCommit=true,消费者处理消息时崩溃,偏移量已经提交,消息丢失——关键业务必须用手动提交偏移量!
五、Redis Pub/Sub:轻量实时的“广播喇叭”
Redis Pub/Sub是Redis的发布订阅功能,核心特性是轻量、简单——适合实时通知场景(比如聊天、推送、实时监控),但不支持持久化、不支持消息确认,不适合关键业务场景。
核心概念(拓展知识)
发布者(Publisher):发送消息到频道(Channel);
订阅者(Subscriber):订阅频道,接收消息;
频道(Channel):消息的分类,发布者发送消息到频道,订阅者从频道取消息。
Redis Pub/Sub的优缺点:

优点 缺点
轻量、简单,部署成本低 不支持持久化,Redis重启消息丢失
实时性高,延迟低 不支持消息确认,订阅者离线消息丢失
支持模式匹配订阅 不支持消息堆积,订阅者必须在线

实战3:Redis Pub/Sub实时通知(C#实战)
用StackExchange.Redis库实现发布者发送消息、订阅者接收消息。
步骤1:安装StackExchange.Redis NuGet包
bash
Install-Package StackExchange.Redis
步骤2:发布者发送消息
csharp

	using StackExchange.Redis;
	using System;
	
	namespace RedisPublisher;
	
	class Program
	{
	static void Main(string[] args)
	{
	// 1. 连接Redis
	var redis = ConnectionMultiplexer.Connect("localhost:6379");
	var db = redis.GetDatabase();
	
	// 2. 发送消息到频道
	var channel = "chat-channel";
	while (true)
	{
	Console.Write("请输入消息(输入exit退出):");
	var message = Console.ReadLine();
	if (message == "exit")
	{
	break;
	}
	// 发布消息到频道
	db.Publish(channel, message);
	Console.WriteLine($"发送消息到频道 {channel}{message}");
	}
	
	redis.Close();
	}
	}

步骤3:订阅者接收消息
csharp

	using StackExchange.Redis;
	using System;
	
	namespace RedisSubscriber;
	
	class Program
	{
	static void Main(string[] args)
	{
	// 1. 连接Redis
	var redis = ConnectionMultiplexer.Connect("localhost:6379");
	var subscriber = redis.GetSubscriber();
	
	// 2. 订阅频道
	var channel = "chat-channel";
	subscriber.Subscribe(channel, (ch, message) =>
	{
	Console.WriteLine($"收到频道 {ch} 的消息:{message}");
	});
	
	Console.WriteLine($"已订阅频道 {channel},等待消息...");
	Console.ReadLine();
	
	// 3. 取消订阅
	subscriber.Unsubscribe(channel);
	redis.Close();
	}
	}

代码逐行讲解:
1.ConnectionMultiplexer:Redis的连接复用器,是StackExchange.Redis的核心,负责管理Redis连接;
2.Publish:发布消息到频道;
3.Subscribe:订阅频道,回调函数接收消息——支持同时订阅多个频道,也支持模式匹配订阅(比如chat-*订阅所有以chat开头的频道)。
我踩过的坑:用Redis Pub/Sub做订单通知,Redis重启后消息全丢了——Redis Pub/Sub只适合实时通知场景,关键业务必须用RabbitMQ或Kafka!
六、总结:三个消息队列的适用场景对比

特性 RabbitMQ Kafka Redis Pub/Sub
可靠性 高(持久化、确认机制、死信队列) 高(持久化、副本、确认机制) 低(不持久化、不支持确认)
吞吐量 中(每秒10万级) 极高(每秒百万级) 高(每秒10万级)
路由能力 强(支持Fanout、Direct、Topic) 弱(只支持Topic分区) 弱(只支持频道订阅)
适用场景 关键业务(订单、支付)、复杂路由 大数据场景(日志、监控、实时分析) 实时通知(聊天、推送、实时监控)
部署成本 中(需要单独部署) 高(需要集群部署) 低(复用现有Redis)

最佳实践
1.关键业务选RabbitMQ或Kafka:
复杂路由、可靠传递选RabbitMQ;
高吞吐量、大数据场景选Kafka;
2.实时通知选Redis Pub/Sub:轻量、简单,适合不需要持久化的场景;
3.可靠消息传递的通用规则:
启用持久化:RabbitMQ的交换机、队列、消息持久化,Kafka的副本和Acks=All;
启用确认机制:生产者确认、消费者确认;
处理失败消息:用死信队列(RabbitMQ)或重试机制(Kafka);
4.监控告警:监控消息堆积、消费延迟、Broker状态,设置告警阈值;
5.避免消息堆积:设置合理的消费者数量,批量消费,处理失败的消息及时转移到死信队列。
下一节我们会学习微服务的服务治理:熔断、降级、限流,让你的微服务在高并发下依然稳定运行。

 本站原创,转载请注明出处:https://www.xin3721.com/ArticlecSharp/c49545.html


相关教程