使用RabbitMQ进行消息传输有以下几个地方可能会丢失数据:
- 1: 投递失败,即
producer
→RabbitMQ
的传输过程中发生意外,RabbitMQ
没拿到消息。 - 2:
RabbitMQ
自己把消息弄丢了 - 3: 消费失败,包含
RabbitMQ
→consumer
的传输过程中发生意外,以及consumer
在处理消息的时候发生异常。
对于第三点,网上有些文章的观点是只要消费方成功收到消息就是算完成可靠传输
消息传输的过程如下图所示
解决办法
借用一张别人的图,原生RabbitMQ
的解决方案:
1 确保消息投递成功(confirm模式)
1 | # 让连接池支持publisher confirms |
题外话:如果想处理消息发送的确认事件(比如消息确认发送成功后执行某个逻辑),则可以通过confirmAckChannel
指定一个通道的名称,然后在程序中去监听这个通道。
1 | spring.cloud.stream.rabbit.bindings.<channelName>.producer.confirm-ack-channel=XXX |
confirmAckChannel
:WhenerrorChannelEnabled
is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms). If the channel does not exist, aDirectChannel
is registered with this name. The connection factory must be configured to enable publisher confirms. Default:nullChannel
(acks are discarded).
对应的配置为
1 | spring: |
2 确保消息被RabbitMQ
妥善保存(开启持久化)
相关参数默认就是开始持久化的,不用特别配置
发送端
1 | # 默认就是 true |
durableSubscription
:Whether the subscription should be durable. Only effective ifgroup
is also set. Default:true
.
exchangeDurable
:IfdeclareExchange
is true, whether the exchange should be durable (that is, it survives broker restart). Default:true
.
消费端
1 | # 默认就是 PERSISTENT |
deliveryMode
:The delivery mode. Default:PERSISTENT
.
3 确保消费方完成消息处理(ACK确认)
消费端
1 | # 默认就是 AUTO |
acknowledgeMode
: The acknowledge mode. Default:AUTO
.
4 备注
durableSubscription
的用处
原生的RabbitMQ 是单独对队列进行持久化配置,Spring Cloud Stream
是通过durableSubscription
来设置,相关代码如下:
RabbitExchangeQueueProvisioner.doProvisionConsumerDestination
1 | boolean durable = !anonymous && properties.getExtension().isDurableSubscription(); |
关于acknowledgeMode.AUTO
org.springframework.amqp.core.AcknowledgeMode
1 | public enum AcknowledgeMode { |
也就是说,只要你的处理逻辑不抛异常,框架就会自动发送ack
,反之则发送nack
。
总结:默认情况下只需要让连接池支持publisher confirms就行了
application.yml
1 | spring: |