欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springcloud stream binding 源码与使用学习笔记

程序员文章站 2022-05-15 19:22:50
...
#  前言

​ 刚看到Stream的功能是对接mq产品,以为就是包装一些mq产品接口,实现自动装配后统一使用。但看了一个简单的demo,是使用rabbitMq产品的binder,还有输入输出接口方法通过配置,来对应不同的mq产品。所以作者实现的功能是在自己的channel与mq产品之间做了一个binder,这样方便的改变配置就使用多个mq,也可以方便的换不同的mq。

​ 但是这些stream的channel如何被实现的,实现类是什么,binder又是如何加载进来的,又是如何通过binding操作把两者绑在一起的?什么时候绑的?都值得了解一下。于是简单浏览了源码,并基于一个rabitmq的demo,补充写一个简单的binder进行了测试。

        本文草稿用markdown写的,格式也没过多清除。主要内容包含三部分:分别是基本的stream的使用,源码分析,自定义binder的使用。

# 1. rabbit为例如何使用stream

## 1.1 基本的使用步骤

​ 这个方便找到一些相关的帖子

```java
//1. pom中引入
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>

//2. 定义各个stream自己的通道。Output的发送,Input的接收。这个接口类会被一个配置类,上面通过加@EnableBinding({MessageSource.class})来触发实现类。和@Enable***都差不多。
//这里有三个chnanel,一个接收,两个发送。
public interface MessageSource {
    String NAME = "pao";//管道名称:"pao"
    @Output(NAME)
    MessageChannel pao();
    @Output("liujunTopic")
    MessageChannel liujunTopic();
    @Input("liujunRevc")
    MessageChannel liujunRevc();
}

//3. 配置文件application.properties中相关的内容。
//spring.cloud.stream.bindings.${channel-name}.destination
//bindings后面的就是上面的通道名字,表示这个通道将和哪个mq的binder绑定。destination表示toipc吧。
//相关的binder:rabbit,上面的pox中引入包的meta-inf中的spring.binders文件中有,比如:
//rabbit:org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration。
//前缀【rabbit】可以看到下面的default中有。指明的【RabbitServiceAutoConfiguration】这个自动配置文件中,会产生RabbitMessageChannelBinder.class这个类对象到容器中。
//liujunmq是我自己弄的一个binder,就是测试用的。
spring.cloud.stream.bindings.output.destination = ${kafka.topic}
spring.cloud.stream.bindings.pao.destination = test
spring.cloud.stream.bindings.input.destination = ${kafka.topic}
spring.cloud.stream.defaultBinder=rabbit
spring.cloud.stream.bindings.liujunTopic.binder=liujunmq
#spring.cloud.stream.bindings.liujunTopic.destination=liujunTest
spring.cloud.stream.bindings.liujunRevc.binder=liujunmq

//4. 使用2中的接口,在controller中,可以装配channel,按名字liujunTopic。后面就可以直接用它来发送了。
    @Autowired
    @Qualifier("liujunTopic") // Bean 名称
    private MessageChannel liujunMessageChannel;

    @GetMapping("/message/sendLiujun")
    public Boolean sendLiujun(@RequestParam String message) {
        System.out.println("1. msg received through the httpClient...");
        liujunMessageChannel.send(MessageBuilder.withPayload(message).build());
        return true;
    }

```



## 1.2 测试项目与结果展示

​ 四个红色的文件:有自己加的spring.binders文件,有channel接口,有controller类产生消息,也有消息到了mockmq后,消费消息的类。最下面的红框是日志结果。mockmq只是一个MsgHolder,可以写入消息,可以配置监听,把消息发给监听器。

springcloud stream binding 源码与使用学习笔记
            
    
    博客分类: springcloudstream springcloudstream 

## 1.3 业务过程

​ 用户使用channel发消息,由于channel在binding时给加了一个监听器,监听器收到消息后才发给mq的生产者。(客户用channel发送,channel的监听器收到,再转发给mqProducer)

​ 使用channel接收消息时,由于binding会产生监听,作为mq的消费者,它得到mq消息后用这个channel发送。而用户设置了channel的监听,就收到了消息。(mqConsumer监听消息,用channel发送,客户的channel监听收到)

​ 下面都看看channel是如何产生的,binder是如何进入容器的。binder什么时候给channel加上需要的监听器,或者给mqConsumer加上监听器的。



# 2. 源码分析

## 2.1 从@enableBinding开始

​ 我们知道很多功能都是从@enable***开始的。这个注解可以加上几个channel的接口。

​ 具体是导入这些类:@Import({ BindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,SpelExpressionConverterConfiguration.class })

## 2.2 导入的BindingServiceConfiguration.class

**看名字是【绑定服务】的配置,这是个@Configuration的类。主要看下面三个类:**

  • - new BindingService(bindingServiceProperties, binderFactory);
  • - new OutputBindingLifecycle();-->bindable.bindOutputs(bindingService);
  • - new InputBindingLifecycle();-->bindable.bindInputs(bindingService);


  后面两个都实现了smartlifecycle接口,在容器启动时,也会执行start(),这时会通过bindingService来进行所有的绑定。这就是绑定时机,另外在stop()时,还会用bindingService进行unbinding操作。

**初步看看bindingService的主要操作:**

  • - bindProducer:getBinder得到binder,再用它binder.bindProducer(bindingTarget, output,
  •   producerProperties);---参数主要是stream的channel与属性值。
  • - bindConsumer:getBinder得到binder,再用它binder.bindConsumer(target,
  •   bindingServiceProperties.getGroup(inputName), input,
  •   consumerProperties);---参数主要是stream的channel与属性值。


​ 绑定服务有了,要绑定的两个对象还没有看到。一个channel将与一个mq产品的生产者或者消费者进行绑定。

## 2.3 导入的BindingBeansRegistrar.class

​ 这个是用于注册bean定义的类,用于处理@EnableBinding中的值,也就是channel接口的类,应该就是被绑定的对象了。

```java
//一般对于接口,肯定是动态代理产生一个类。这个类一般通过一个factoryBean的getObject()方法得到。比如duboo中,对接口的实现就是把请求代理成一个远程的消息发送。
//
if (type.isInterface()) {
			RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(BindableProxyFactory.class);
			rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(Bindings.class, parent));
rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(type);
			registry.registerBeanDefinition(type.getName(), rootBeanDefinition);
		}
```


​ 重点看BindableProxyFactory这个类,是个工厂bean。

```java
 * {@link FactoryBean} for instantiating the interfaces specified via
 * {@link EnableBinding}
//所有EnableBinding指明的接口的实例化类,实现了工厂bean
//自己又是一个Interceptor,产生代理类
public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean
------------------------------------------------------------------
//getObject果然返回代理对象,MethodInterceptor还是this。
	@Override
	public synchronized Object getObject() throws Exception {
		if (this.proxy == null) {
			ProxyFactory factory = new ProxyFactory(this.type, this);
			this.proxy = factory.getProxy();
		}
		return this.proxy;
	}
------------------------------------------------------------------
//看看channel接口的代理对象的方法,执行是如何的?是直接从inputHolders拿到,按名字缓存起来。
	@Override
	public synchronized Object invoke(MethodInvocation invocation) throws Throwable {
		Method method = invocation.getMethod();
...//方法上的Input注解的名字,作为channel的名字。
		Input input = AnnotationUtils.findAnnotation(method, Input.class);
		if (input != null) {
			String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method);
			boundTarget = this.inputHolders.get(name).getBoundTarget();
			targetCache.put(method, boundTarget);
			return boundTarget;
		}
		else {
...//output略
		}
		return null;
	}
------------------------------------------------------------------
//从invoker方法中,看到代理类是根据Input.class注解的名字,从inputHolders这样一个map中拿到的对象。说明这个对象应该已经存在了。
//在产生代理类产生之前,即调用getObject()之前,早就先加载了相应的boundTarget放map中了。
//果然有afterPropertiesSet方法,它对input与output分别进行了处理,把产生的channel对象放到了inputHolders中。上面的invoke才能拿到。这句是input注解的方法的处理。
BindableProxyFactory.this.inputHolders.put(name,
								new BoundTargetHolder(getBindingTargetFactory(returnType).createInput(name), true));

//按类型得到工厂,再根据名字产生绑定对象。
//关于工厂,第一个导入类中有这个bean,就是BindingTargetFactory,可以生成BindingTarget。
@Bean
public SubscribableChannelBindingTargetFactory channelFactory(
    CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
    return new SubscribableChannelBindingTargetFactory(compositeMessageChannelConfigurer);
}

//工厂是这么产生绑定对象的。
SubscribableChannelBindingTargetFactory-->createInput/createOutput-->return SubscribableChannel subscribableChannel = new DirectChannel();

//这个steam的channel对象就是下面的样子,是可以被订阅的。最开始的例子说明发送/接收都可以被订阅(监听)。input与output都是这个,因为都需要一端发,另一端订阅接收。
public class DirectChannel extends AbstractSubscribableChannel
```


​ 上面有了绑定服务,也有了绑定对象了,还缺少绑定者binder。

## 2.4 导入的BinderFactoryConfiguration.class

​ 看名字,这个是binder的工厂,有了工厂,binder就肯定有了。重点看两个bean.

```java
//这个是binder的工厂
@Bean
@ConditionalOnMissingBean(BinderFactory.class)
public DefaultBinderFactory binderFactory() {
	DefaultBinderFactory binderFactory = new DefaultBinderFactory(getBinderConfigurations());
	binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
	binderFactory.setListeners(binderFactoryListeners);
	return binderFactory;
}
//------------------------------------------------------------------
//这个是binder的类型注册,从META-INF/spring.binders文件中来,本例中有两个,一个自己写的,一个是rabbit的。
@Bean
@ConditionalOnMissingBean(BinderTypeRegistry.class)
public BinderTypeRegistry binderTypeRegistry(ConfigurableApplicationContext configurableApplicationContext) {
    Map<String, BinderType> binderTypes = new HashMap<>();
...
    try {
        Enumeration<URL> resources = classLoader.getResources("META-INF/spring.binders");
...
        while (resources.hasMoreElements()) {
            URL url = resources.nextElement();
            UrlResource resource = new UrlResource(url);
            for (BinderType binderType : parseBinderConfigurations(classLoader, resource)) {
                binderTypes.put(binderType.getDefaultName(), binderType);
            }
        }
    }
...
    return new DefaultBinderTypeRegistry(binderTypes);
}
```

​ 看一下这个binder工厂:DefaultBinderFactory,以及其中最主要的getBinder方法。

```java
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware

//得到binder的方法。其中根据文件中的binder配置类,还产生了一个子容器。再从中取出Binder.class类型的bean。说明每个mq都是一个子容器当中?
//子容器当然可以使用父容器中的对象,父容器也可以通过这个工厂类,得到子容器中的binder。
	private <T> Binder<T, ?, ?> getBinderInstance(String configurationName) {
...
			Properties binderProperties = binderConfiguration.getProperties();
			ArrayList<String> args = new ArrayList<>();
			for (Map.Entry<Object, Object> property : binderProperties.entrySet()) {
				args.add(String.format("--%s=%s", property.getKey(), property.getValue()));
			}
...
			args.add("--spring.main.applicationContextClass=" + AnnotationConfigApplicationContext.class.getName());
			List<Class<?>> configurationClasses = new ArrayList<Class<?>>(
	Arrays.asList(binderConfiguration.getBinderType().getConfigurationClasses()));
			SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder()
					.sources(configurationClasses.toArray(new Class<?>[] {})).bannerMode(Mode.OFF).web(false);
			if (useApplicationContextAsParent) {
				springApplicationBuilder.parent(this.context);
			}
...
			ConfigurableApplicationContext binderProducingContext = springApplicationBuilder
					.run(args.toArray(new String[args.size()]));
			@SuppressWarnings("unchecked")
			Binder<T, ?, ?> binder = binderProducingContext.getBean(Binder.class);
...
			this.binderInstanceCache.put(configurationName, new BinderInstanceHolder(binder, binderProducingContext));
		}
		return (Binder<T, ?, ?>) this.binderInstanceCache.get(configurationName).getBinderInstance();
	}
```




##  2.5 回头看绑定操作-OutputBindingLifecycle

​ 以发送绑定为例子。

```java
//实现了SmartLifecycle,可以随容器启停。实现了ApplicationContextAware,可以方便拿容器中的bean使用。
public class OutputBindingLifecycle implements SmartLifecycle, ApplicationContextAware

////容器启动带着这个也start()。找出bindable进行bindInputs and OutPuts
	@Override
	public void start() {
		if (!running) {
			// retrieve the BindingService lazily, avoiding early initialization
			try {
				BindingService bindingService = this.applicationContext
						.getBean(BindingService.class);
				Map<String, Bindable> bindables = this.applicationContext
						.getBeansOfType(Bindable.class);
				for (Bindable bindable : bindables.values()) {
					bindable.bindOutputs(bindingService);
				}
			}
...
		}
	}

//--------------------------------------------------------
//BindableProxyFactory中执行上面的bindable.bindOutputs(bindingService);
//BindableProxyFactory实现了bindable,正好也是因为它的outputHolders持有所有的outputchannel这些target。通过bindingService来进行。
	@Override
	public void bindOutputs(BindingService bindingService) {
	...
        for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.inputHolders.entrySet()) {
            String inputTargetName = boundTargetHolderEntry.getKey();
            BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue();
            if (boundTargetHolder.isBindable()) {
                bindingService.bindConsumer(boundTargetHolder.getBoundTarget(), inputTargetName);
            }
        }
	}
//--------------------------------------------------------
//bindingService.bindConsumer。找到binder,绑定目标。
	public <T> Binding<T> bindProducer(T output, String outputName) {
		String bindingTarget = this.bindingServiceProperties
				.getBindingDestination(outputName);
		Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(outputName, output.getClass());
...
		Binding<T> binding = binder.bindProducer(bindingTarget, output,
				producerProperties);
		this.producerBindings.put(outputName, binding);
		return binding;
	}
//--------------------------------------------------------
//getBinder(outputName, output.getClass());用binder工厂,以配置文件与类型为参数。前面说过可能有子容器的问题。
	private <T> Binder<T, ?, ?> getBinder(String channelName, Class<T> bindableType) {
		String binderConfigurationName = this.bindingServiceProperties.getBinder(channelName);
		return binderFactory.getBinder(binderConfigurationName, bindableType);
	}
```





# 3. 测试自定义的binder

​ 直接在一个stream的rabbitmq的demo项目上添加,前面已经显示过一些文件。这里的mq就用一个可接受msg,也可以收到监听时,把msg给它的类代替。

## 3.1 写一个binder与它的配置类及文件

binder类:

```java
public class LiujunMessageChannelBinder implements
		Binder<MessageChannel, ConsumerProperties, ProducerProperties> {
	@Autowired
	MsgHolder msgHolder;//一个mock的mq.应该在父容器中。

	@Override
	public Binding<MessageChannel> bindConsumer(String name, String group,
			MessageChannel inboundBindTarget,
			ConsumerProperties consumerProperties) {
		try {
            //在mq中加一个监听消费者,收到消息就用stream的Channel发出去。真正的客户会监听stream的Channel
			msgHolder.setMsgListener(msg -> {
						System.out.println("4. mqClient listener got the msg from mock-mq,then sent to targetOutput!");
						inboundBindTarget.send(msg);
					});
		} catch (Exception e) {
			e.printStackTrace();
		}
		//返回实现unbinding类
		return () -> {
			System.out.println("Unbinding");
		};
	}

	@Override
	public Binding<MessageChannel> bindProducer(String name,
			MessageChannel outputChannel, ProducerProperties producerProperties) {
		Assert.isInstanceOf(SubscribableChannel.class, outputChannel,
				"Binding is supported only for SubscribableChannel instances");
		SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;
		//监听真正用户用Channel发来send来的消息,再发(设置)给mq。
        subscribableChannel.subscribe(message -> {
//			Object messageBody = message.getPayload();
			System.out.println("2. Input subscriber get the msg and send to mock-mq:"
					+ message);
			msgHolder.setMsg(message);
		});
		//返回实现unbinding类
		return () -> {
			System.out.println("Unbinding");
		};
	}
}
```

binder的配置类

```java
//binder的配置类。
@Configuration
//@ConditionalOnMissingBean(Binder.class)
public class LiujunMessageChannelBinderConfiguration {
@Bean
LiujunMessageChannelBinder liujunMessageChannelBinder() {
	LiujunMessageChannelBinder binder = new LiujunMessageChannelBinder();
	return binder;
}
}
```

建一个meta-inf文件夹,里面写一个spring.binders文本文件,只有一句,指出binders的配置类。

```properties
liujunmq:\
com.gupao.springcloudstream.rabbitmq.stream.LiujunMessageChannelBinderConfiguration
```




## 3.2 channel接口类并放在@enableBinding中

```java
public interface MessageSource {
//发消息
@Output("liujunTopic")
MessageChannel liujunTopic();
//收
@Input("liujunRevc")
MessageChannel liujunRevc();
}
```


​ 特意排除了binder的配置类。

```java
@SpringBootApplication
@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {LiujunMessageChannelBinderConfiguration.class}))
@EnableBinding({MessageSource.class})
public class SpringCloudStreamApplication {
	public static void main(String[] args) {
		SpringApplication.run(SpringCloudStreamApplication.class, args);
	}
}
```

application.propeties文件中增加这些,表示两个通道都被绑定到我自己加的binder上。spring.binders中有`liujunmq`

```properties
spring.cloud.stream.bindings.liujunTopic.binder=liujunmq
spring.cloud.stream.bindings.liujunRevc.binder=liujunmq
```




## 3.3 发送、接收及模拟的mq

​ 在一个controller中,装配一个特定的发送通道,把http请求中的msg发出去。

```java
@Autowired
@Qualifier("liujunTopic") // Bean 名称
private MessageChannel liujunMessageChannel;

@GetMapping("/message/sendLiujun")
public Boolean sendLiujun(@RequestParam String message) {
	System.out.println("1. msg received through the httpClient...");
	liujunMessageChannel.send(MessageBuilder.withPayload(message).build());
    return true;
}
```

​ 两种方式消费,指定所用的Input通道名字。

```java
@Component
public class LiujunMessageConsumerBean {
    @Autowired
    private MessageSource messageSource;
//    @StreamListener("liujunRevc")
//    public void onMessage(String message){
//        System.out.println("5. targetOutput Listener get msg: " + message);
//    }
    @ServiceActivator(inputChannel = "liujunRevc")
    public void onMessage(Object message) {
        System.out.println("6. targetOutput another Listener get msg: " + message);
    }
}
```


```java
//模拟的mq
@Component
public class MsgHolder {
	public Message msg;
	public MsgListener msgListener;
	public Message getMsg() {
		return msg;
	}
    //设置消息时(收到)后,又把消息给接收者(消费)
	public void setMsg(Message msg) {
		this.msg = msg;
		System.out.println("3. mock-mq get msg and tell the mqClientlistener...");
		msgListener.OnMsg(msg);
		msg=null;
	}
    ...
}
```




## 3.4 运行结果

浏览器输入:[http://192.168.1.6:8080/message/sendLiujun?message=herriman目]

```verilog
2020-01-08 23:01:29.683  INFO 12984 --- [6SvEgzY8IDiA-61] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5d4b0dbd: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
1. msg received through the httpClient...
2. Input subscriber get the msg and send to mock-mq:GenericMessage [payload=herriman目, headers={id=67fb81dc-ed14-5850-f2d1-499b25d055c7, timestamp=1578495697875}]
3. mock-mq get msg and tell the mqClientlistener...
4. mqClient listener got the msg from mock-mq,then sent to targetOutput!
6. targetOutput another Listener get msg: herriman目
```


    按照设计,正确的输出了每一步的日志。通过本次学习,兼容多种产品学到了一招,在springcloud bus中也是,一个应用产生了event,又自己的容器监听到再发给对方,对方收到后,产生一个event,再由自己的监听处理。即兼容了本地的机制,又兼顾了远程传输。

    另外就是如何把你要的类,比如本例中的channel实例都加载到容器中,binder实例如何加载到容器中。而关联功能又是一个类负责,职责明确。另外还有两个类在正确的容器时机,利用关联类进行绑定与解除的处理。所有设计功能,职责,时机都很有讲究。


相关标签: springcloud stream