Spring AMQP 介绍
AMQP(高级消息队列协议)是一个异步消息传递所使用的应用层协议规范,为面向消息的中间件设计,不受产品和开发语言的限制. Spring AMQP 将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发。
RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息中间件,Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括 spring-boot-starter-amqp “Starter”。
Spring AMQP 集成与配置
-
可通过如下 Docker 命令 安装 RabbiMQ:
|
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management |
-
登录 RabbiMQ 的 web 管理界面,创建虚拟主机
novel
:

-
项目中加入如下的 maven 依赖:
|
<dependency> |
|
<groupId>org.springframework.boot</groupId> |
|
<artifactId>spring-boot-starter-amqp</artifactId> |
|
</dependency> |
-
在 application.yml 配置文件中加入 RabbitMQ 的连接配置:
|
spring: |
|
rabbitmq: |
|
addresses: "amqp://guest:guest@47.106.243.172" |
|
virtual-host: novel |
|
template: |
|
retry: |
|
|
|
enabled: true |
|
|
|
max-attempts: 3 |
|
|
|
initial-interval: "3s" |
-
此时已经可以在 Spring Beans 中注入 AmqpTemplate 发送消息了。
Spring AMQP 使用示例
-
在
io.github.xxyopen.novel.core.constant
包下创建 AMQP 相关常量类:
|
|
|
|
|
|
|
|
|
|
|
|
|
public class AmqpConsts { |
|
|
|
|
|
|
|
|
|
public static class BookChangeMq{ |
|
|
|
|
|
|
|
|
|
public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE"; |
|
|
|
|
|
|
|
|
|
public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE"; |
|
|
|
|
|
|
|
|
|
public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE"; |
|
|
|
|
|
|
|
} |
|
|
|
} |
-
在
io.github.xxyopen.novel.core.config
包下创建 AMQP 配置类,配置各个交换机、队列以及绑定关系:
|
|
|
|
|
|
|
|
|
|
|
|
|
@Configuration |
|
public class AmqpConfig { |
|
|
|
|
|
|
|
|
|
@Bean |
|
public FanoutExchange bookChangeExchange() { |
|
return new FanoutExchange(AmqpConsts.BookChangeMq.EXCHANGE_NAME); |
|
} |
|
|
|
|
|
|
|
|
|
@Bean |
|
public Queue esBookUpdateQueue() { |
|
return new Queue(AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE); |
|
} |
|
|
|
|
|
|
|
|
|
@Bean |
|
public Binding esBookUpdateQueueBinding() { |
|
return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange()); |
|
} |
|
|
|
|
|
|
|
} |
-
在
io.github.xxyopen.novel.manager.mq
包下创建 AMQP 消息管理类,用来发送各种 AMQP 消息:
|
|
|
|
|
|
|
|
|
|
|
|
|
@Component |
|
@RequiredArgsConstructor |
|
public class AmqpMsgManager { |
|
|
|
private final AmqpTemplate amqpTemplate; |
|
|
|
@Value("${spring.amqp.enable}") |
|
private String enableAmqp; |
|
|
|
|
|
|
|
|
|
public void sendBookChangeMsg(Long bookId) { |
|
if (Objects.equals(enableAmqp, CommonConsts.TRUE)) { |
|
sendAmqpMessage(amqpTemplate, AmqpConsts.BookChangeMq.EXCHANGE_NAME, null, bookId); |
|
} |
|
} |
|
|
|
private void sendAmqpMessage(AmqpTemplate amqpTemplate, String exchange, String routingKey, Object message) { |
|
|
|
if (TransactionSynchronizationManager.isActualTransactionActive()) { |
|
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { |
|
@Override |
|
public void afterCommit() { |
|
amqpTemplate.convertAndSend(exchange, routingKey, message); |
|
} |
|
}); |
|
return; |
|
} |
|
amqpTemplate.convertAndSend(exchange, routingKey, message); |
|
} |
|
|
|
} |
-
在小说信息更新后,发送 AMQP 消息:
|
@Transactional(rollbackFor = Exception.class) |
|
@Override |
|
public RestResp<Void> saveBookChapter(ChapterAddReqDto dto) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
amqpMsgManager.sendBookChangeMsg(dto.getBookId()); |
|
return RestResp.ok(); |
|
} |
-
在
io.github.xxyopen.novel.core.listener
包下创建 Rabbit 队列监听器,监听各个 RabbitMQ 队列的消息并处理:
|
|
|
|
|
|
|
|
|
|
|
|
|
@Component |
|
@RequiredArgsConstructor |
|
@Slf4j |
|
public class RabbitQueueListener { |
|
|
|
private final BookInfoMapper bookInfoMapper; |
|
|
|
private final ElasticsearchClient esClient; |
|
|
|
|
|
|
|
|
|
@RabbitListener(queues = AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE) |
|
@SneakyThrows |
|
public void updateEsBook(Long bookId) { |
|
BookInfo bookInfo = bookInfoMapper.selectById(bookId); |
|
IndexResponse response = esClient.index(i -> i |
|
.index(EsConsts.BookIndex.INDEX_NAME) |
|
.id(bookInfo.getId().toString()) |
|
.document(EsBookDto.build(bookInfo)) |
|
); |
|
log.info("Indexed with version " + response.version()); |
|
} |
|
|
|
|
|
|
|
} |
此时,如果需要更新其它小说副本数据,只需要配置更新队列和增加监听器,不需要在小说信息变更的地方增加任何业务代码,而且任意小说副本的数据刷新之间互不影响,真正实现了模块间的解耦。
原文链接:https://www.cnblogs.com/xxyopen/p/16350743.html