VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • spring cloud stream 3.1.2 源码搭配rocketmq学习 (四)

spring-cloud-stream的spring.factories#

在这我们发现有一个 BindingServiceConfiguration 的自动装载的类.

主要绑定是在 outputBindingLifecycle 和 inputBindingLifecycle

发现这两个类继承了SmartLifecycle

废话不多说, 多说都是废话

直接看start方法


Copy
public void start() { ... this.bindables.values().forEach(this::doStartWithBindable); ... }

直接就盯着 doStartWithBindable看!!!

发现这个方法是实现类实现的.

我们以input为例子

看到InputBindingLifecycle#doStartWithBindable


Copy
void doStartWithBindable(Bindable bindable) { Collection<Binding<Object>> bindableBindings = bindable.createAndBindInputs(this.bindingService); }

有一个createAndBindInputs, 直接定位到了 AbstractBindableProxyFactory#createAndBindInputs这个方法(具体技巧大家要不自己悟一下吧).

发现里面有非常重要的方法调用 bindingService.bindConsumer

我们就是在找怎么绑定的.就是你了!


Copy
public <T> Collection<Binding<T>> bindConsumer(T input, String inputName) { ... Binder<T, ConsumerProperties, ?> binder = this.getBinder(inputName, inputClass); ... this.doBindConsumer(binder); ... } // 伪代码 public <T> Binding<T> doBindConsumer(Binder<T, ConsumerProperties, ?> binder) { binder.bindConsumer(); }

啊吧啊吧一堆代码, 终于在其中发现关键的两个方法 getBinder 和 doBindConsumer

会发现这个调用是DefaultBinderFactory#getBinder

他在里面调用了this.context.getBeansOfType(Binder.class)

取出了一个Binder.class类型的bean

记住这个Binder类型.

看doBindConsumer实现主线就是调用的binder的bindConsumer方法.

接下来我们看到rocketmq的注册.


rocketmq注册#

要新版的rocketmq才有下述代码

我们找到spring.binders

发现RocketMQBinderAutoConfiguration里注册了一个Bean: RocketMQMessageChannelBinder

ChannelBinder 又有Channel 又有Binder 就是他了呀

发现他继承的是AbstractBindingLifecycle, 我们看看继承图, 有Binder.

![AbstractBindingLifecycle](AbstractMessageChannelBinder继承关系.png

我们上面是不是get了一个Binder的Bean, 芜湖, 就是他了!

我们调用的bindConsumer就是调用他的bindConsumer呀.

我们看看他的实现是调用的doBindConsumer


Copy
public final Binding<T> bindConsumer(String name, String group, T target, C properties) { return this.doBindConsumer(name, group, target, properties); } protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);

我们看看这个doBindConsumer


Copy
public final Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, final C properties) throws BinderException { ... consumerEndpoint = this.createConsumerEndpoint(destination, group, properties); consumerEndpoint.setOutputChannel(inputChannel); ... }

发现主要调用的是createConsumerEndpoint

还有把inputChannel设置进了OutputChannel中.

那我们来看看createConsumerEndpoint做了什么.


Copy
//伪代码 protected MessageProducer createConsumerEndpoint() { RocketMQInboundChannelAdapter inboundChannelAdapter = new RocketMQInboundChannelAdapter(destination.getName(), extendedConsumerProperties); }

发现他主要是new了RocketMQInboundChannelAdapter这个类然后对属性进行设置.

我们发现RocketMQInboundChannelAdapter有一个onInit的方法, 他会被IntegrationObjectSupport#afterPropertiesSet调用, 具体说明省略, 有兴趣的同学可以自己挖掘一下吧~~~


Copy
protected void onInit() { this.pushConsumer = RocketMQConsumerFactory.initPushConsumer(this.extendedConsumerProperties); this.pushConsumer.registerMessageListener((msgs, context) -> { return (ConsumeOrderlyStatus)this.consumeMessage(msgs, () -> { ... }, () -> { return ConsumeOrderlyStatus.SUCCESS; }); }); }

初始化了一个Consumer并把rocketmq的监听注册了进去. 写过rocketmq的是不是很熟悉, 这就是原本的写法.

那这个监听做了啥????


Copy
private <R> R consumeMessage(List<MessageExt> messageExtList, Supplier<R> failSupplier, Supplier<R> sucSupplier) { this.sendMessage(message); }

sendMessage??? 发送了出去...!!!!

这时候我们不禁化身柯南, 我们之前把channel和function连接在了一起, 这里是不是直接通过channel发送到了我们的function里!~真相???

离答案原来越近了, 万恶剧透..确实是这样!

我们来看看sendMessage做了什么


Copy
protected void sendMessage(Message<?> messageArg) { this.messagingTemplate.send(this.getRequiredOutputChannel(), message); }

getRequiredOutputChannel看到这个方法是不是想到上面有个方法往adapter里设置了一个channel啊.

想到(三)中channel在function初始化的时候是不是已经和function进行了绑定了, 那外部消息中间件, channel, function???? 芜湖, 真相越来越近了. 三者的联系. 明朗了!!!

那我们就可以猜测流程, 外部消息中间件接收到了消息, 通过channel将信息转发到了function中进行处理.

那我们继续看来验证一下我们的猜测!~

看到messagingTemplate.send这个方法

我们打个断点能发现执行到了GenericMessagingTemplate#doSend这个方法, 发现他最核心的就是调用了channel.send

看过第三章的应该知道这个channel就是DirectWithAttributesChannel, 所以我们看看里面的send方法.

发现在父类AbstractMessageChannel调用了dosend, doSend的实现又在他的子类AbstractSubscribableChannel中.


Copy
protected boolean doSend(Message<?> message, long timeout) { try { return this.getRequiredDispatcher().dispatch(message); } catch (MessageDispatchingException var6) { ... } }

getRequiredDispatcher(), Dispatcher是不是很熟悉, 在(三)中我们往其中handler加入了messageHandler.


Copy
private boolean doDispatch(Message<?> message) { ... Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message); ... handler.handleMessage(message); ... } // 返回一个handler Iterator private Iterator<MessageHandler> getHandlerIterator(Message<?> message) { Set<MessageHandler> handlers = this.getHandlers(); return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator(); }

我们再深入看代码会发现其实就是拿出messageHandler对消息进行了处理, messageHandler其实就是框架对function进行的再次封装实际就是调用了我们function...至此, 我们的猜想是不是验证成立了!!!~ 芜湖~ 终于完结, 把所有流程都串起来了!!!

调用流程图传送门


总结#

  1. inputBindingLifecycle/outputBindingLifecycle的Bean进行初始化, 调用getBinder获取到了外部消息中间件的Bean
  2. 调用binder的bindConsumer进行绑定, 并传入了对应的channel.
  3. 外部消息中间件调用createConsumerEndpoint生成一个adapter, 根据配置生成初始化配置, 并把channel设置进adapter中.
  4. adapter初始化被调用, 把消息处理注册进pushConsumer中, 等待消息的触发.
  5. 消息到来触发注册的函数, 调用sendMessage方法.
  6. 使用上述设置的channel进行发送, 会执行在(三)中注册的messageHandler, 调用对应的function.
原文:https://www.cnblogs.com/my1024/p/14899006.html

相关教程