Spring Cloud Stream Reference Guide(Ditmars.RELEASE)的阅读笔记
相关内容
- 第四章:绑定器(Binder)
- 第五章:配置选项
文档中到处都有Binding
和Binder
这两个概念,一开始觉得有点晕头转向,推敲之后发现:
- Binding 指绑定的过程,和Binding有关参数就是在执行绑定过程中需要用到的参数.
- Binder 指执行绑定过程的对象,如Rabbit Binder,Kafka Binder.
4 绑定器(Binder)
Binder是框架提供的一层抽象,其具体的实现有RabbitMQ
和Kafka
.Binder接口定义如下:
1 | public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> { |
Binder的探测
Spring Cloud Stream 依靠Spring Boot的自动配置机制处理绑定过程,如果在类路径下有且只有一个Binder的实现,它就会被自动选择并使用.
如果类路径下存在多个Binder的实现,比如:
1 | <dependency> |
这种情况下,应用程序就需要明确指定使用那个Binder的实现,有两种方式:
- 全局方式:
spring.cloud.stream.defaultBinder=binder的名称
,比如1
spring.cloud.stream.defaultBinder=rabbit
- 每个Channel单独指定:
spring.cloud.stream.bindings.input.binder=binder的名称
,比如至于Binder的名字,可以查看Binder的jar包中的1
2spring.cloud.stream.bindings.my-channle-1.binder=kafka
spring.cloud.stream.bindings.my-channle-2.binder=rabbitMETA-INF/spring.binders
文件,内容大致如下:1
2rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration
配置网络信息
一直纳闷,为啥文档全篇都不提及怎么配置网络信息,研究了一下发现它们归Spring Boot管,RabbitMQ的通过:RabbitProperties.java映射,Kafka的通过KafkaProperties.java映射.
相关的配置项可以在common-application-properties查阅.
再结合Connecting to Multiple Systems的内容,总结起来有两种配置模式(以RabbitMQ为例):
- 第一种是全局配置:相关配置信息写在
spring.rabbitmq
下. - 第二种是
spring cloud stream
所支持的多环境独立配置:需要写在spring.cloud.stream.binders.<配置名称>.environment.spring.rabbitmq.*
里面.例子:这个配置为不同的通道(input和output)配备了不同的Broker主机,即两个通道连不同的服务器(Broker),1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23spring:
cloud:
stream:
bindings:
input:
destination: foo
binder: rabbit1
output:
destination: bar
binder: rabbit2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: <host1>
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: <host2>Spring Cloud Stream
在处理通道绑定的时候,会用spring.cloud.stream.binders.<配置名称>.environment.spring.rabbitmq.
里面的内容创建一个新的RabbitProperties
对象,用这个对象去创建连接工厂(CachingConnectionFactory)
Binder 的配置选项
Binder的配置参数需要写在spring.cloud.stream.binders.<configurationName>
下面.上面那个多环境的配置就是一个例子.
具体的配置项有以下这些:
type
指明Binder的类型,RabbitMQ叫rabbit
,Kafka叫kafka
.Binder会在META-INF/spring.binders
标明自己的type名称.
默认值:这个参数的默认值就是配置名称(
spring.cloud.stream.binders.<configurationName>
的最后一部分)的值.
inheritEnvironment
是否继承来自应用程序的环境信息(environment)
默认值:true.
environment
用于申明自定义Binder环境配置信息
默认值:空
defaultCandidate
是否将此Binder的视为默认Binder的候选者.
默认值:true.
5 配置选项
配置分为两个大类,一类通用配置,另一类是特定中间件的配置.
配置信息可以由Spring Boot支持的任何方式传递给框架(Spring Cloud Stream
),比如命令行,环境变量,Cloud Config,配置文件等.
Spring Cloud Stream 配置项
这部分配置项的前缀是:spring.cloud.stream
.
instanceCount
应用程序的实例数量.
- 默认值: 1
- 如果用的是kafka,并且需要支持分区就一定设置这个参数.
instanceIndex
应用程序实例的索引号,范围是[0,instanceCount).
- 默认值:0
- 用于配置kafka分区,在
Cloud Foundry
环境下框架会自动设置.
dynamicDestinations
动态目的地,列表类型
- 默认值:empty
- 如果设置了,只有列表中的目的地可以绑定,没有设置的话,任何目的地都允许绑定.
可以理解为限制通道目的地的白名单.
defaultBinder
指定默认绑定器
- 默认值:empty
前面
Binder的探测
提到过这个这个参数.
overrideCloudConnectors
覆盖云平台提供的Connector
- 默认值: false
- 这个参数生效需要两个前提条件:
cloud
配置(profile)被激活- 云平台提供了Spring Cloud Connectors(给应用程序).
- false:框架会自动探测绑定服务,并使用该绑定服务创建连接.
- true:框架会忽略绑定服务,根据程序自身的配置(如
spring.rabbitmq.*
)来创建连接.
绑定(Binding)配置项
一般来说,主要就是配置这部分参数,这样才能让消息发往正确的地方,或者从Broker上拿到想要的消息.
配置基于这样一种格式:
1 | spring.cloud.stream.bindings.<channelName>.<property>=<value> |
这部分配置又可以分为三类:
- 通用的配置项
- 消息生产者专用的配置项
- 消息消费者专用的配置项
通用Binging配置项
这部分配置的前缀是spring.cloud.stream.bindings.<channelName>
,channelName表示通道的名称,这个是自己定义的.也就是说需要为每一个通道分别配置.
如果需要声明一些默认配置可以使用spring.cloud.stream.bindings.default.<property>=<value>
.比如:
1 | spring.cloud.stream.default.contentType=application/json |
destination
通道的目的地名称,对于RabbitMQ就是交换机,对于Kafka来说就是topic.
- 默认值:通道的名称
如果此通道是绑定给消费者(consumer)的,这个地方就可以设置多个目的地,用逗号分隔.
group
通道的消费者组名称,只有入站绑定才有效
- 默认值: null(表示匿名消费者)
contentType
内容的类型,比如application/json
- 默认值: null(无强制类型要求)
binder
指定此通道所使用的绑定器.
- 默认值:null(使用默认绑定器,如果不存在会报错)
消费者Binging配置项
这部分配置的前缀是spring.cloud.stream.bindings.<channelName>.consumer
.
如果需要声明一些消费者的默认配置可以使用spring.cloud.stream.default.consumer.<property>=<value>
.比如:
1 | spring.cloud.stream.default.consumer.headerMode=raw |
concurrency
消费的并发性
- 默认值:1
Stack Overflow上找到一个帖子,可以看看有助理解.
partitioned
是否分区(消息来自分区的生产者)
- 默认值:false
headerMode
头部模式.如果设置为raw
,header解析会被禁用.只有在消息中间件自身并不是支持消息头,但是又要求植入消息头的情况下才有效果.
- 默认值:embeddedHeaders
这个参数一般用于消息来自外部消息系统的场景.
maxAttempts
处理消息的最大尝试次数,用于配置失败重试,设为1的就是禁用失败重试(因为这个次数包含第一次处理)
- 默认值:3
backOffInitialInterval
重试时间间隔的初始值(毫秒).
- 默认值:1000
backOffMaxInterval
重试时间间隔的最大值(毫秒).
- 默认值:10000
backOffMultiplier
重试时间间隔的计算倍数(float).
- 默认值:2.0
instanceIndex
这个参数用于设置该消费者对应的实例索引号
- 默认值:-1
通用Binding配置里面也有一个配置实例索引号的配置(
spring.cloud.stream.instanceIndex
),那个地方相当于是全局配置,这里用于对当前的消费者单独配置.如果这个参数的值<0,表示使用全局配置.
instanceCount
和上面那个instanceIndex
的作用机制一样.
- 默认值:-1
生产者Binging配置项
这部分配置的前缀是spring.cloud.stream.bindings.<channelName>.producer
.
如果需要声明一些消费者的默认配置可以使用spring.cloud.stream.default.producer.<property>=<value>
.比如:
1 | spring.cloud.stream.default.producer.partitionKeyExpression=payload.id |
instanceCount
和instanceIndex原理一样.
- 默认值:-1
partitionKeyExpression
声明如何根据消息确定消息分区的键,这个参数的值应该是一个SpEL
表达式.如果配置了此参数或者partitionKeyExtractorClass
的一个,就表示启用了分区(当前通道上的出站数据会被分区).不过,要是分区真正生效,还需要设置partitionCount
>0.
- 默认值:null
这个参数和
partitionKeyExtractorClass
是互相排斥的,只能配一个.
partitionKeyExtractorClass
声明如何根据消息确定消息分区的键,这个参数的值应该是一个PartitionKeyExtractorStrategy
接口的实现类.作用机制和上面的partitionKeyExpression
一样.
- 默认值:null
这个参数和
partitionKeyExpression
是互相排斥的,只能配一个.
partitionSelectorClass
声明如何选择分区,这个参数的值应该是一个PartitionSelectorStrategy
接口的实现类.这个参数需要依赖partitionKeyExpression
或者 partitionKeyExtractorClass
参数来计算分区键.
- 默认值:null
- 这个参数和
partitionSelectorExpression
是互相排斥的,只能配一个.假设这两个参数都没设置,那么分区的选择算法为:hashCode(key) % partitionCount
partitionSelectorExpression
声明如何选择分区,这个参数的值应该是一个SpEL
表达式.其作用机制和上面的partitionSelectorClass
一样.
- 默认值:null
partitionCount
数据的分区数量(如果分区已经开启,见前面partitionKeyExpression
部分).必须是一个>1的值.
- 默认值:1
对于kafka而言,这个参数应该视为一个分区数量的提示,kafka会使用一个比这个更大的值.
requiredGroups
一个逗号分隔的group列表,用于确保消息能够被投递.
因为组(group)对与消费方来说才是必须要声明的,假设生产者比消费者先启动,那么消费者回因为组的声明时机太晚而无法拿到历史消息.rabbit Binder的实现就是检查这个参数,(自己)把队列创建了.
headerMode
和消费者Binging里的同名参数一样.
useNativeEncoding
是否采用原生编码,如果设置为true,消息的编码直接由中间件的原生客户端完成.
- 默认值:false
errorChannelEnabled
是否开启错误通道.这个参数要参考Message Channel Binders and Error Channels.
- 默认值:false
一个rabbit配置样本
1 | spring: |
这个配置实现的效果是: