消息可靠性问题

消息丢失的可能性

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接受到消息后未消费就宕机

根据以上几种可能,我们分别有生产者消息确认,消息持久化,消费者消息确认和消费失败重试机制来解决以上的问题

生产者消息确认

RabbitMQ提供了发送者确认和发送者回执两种机制来确认消息不会在发送过程中丢失

Publisher Confirm(发送者确认)

Publisher Confirm机制是RabbitMQ用来确保消息被成功发送到RabbitMQ服务器(至少是成功到达交换机)的一种机制。当启用这个机制时,RabbitMQ会向生产者(publisher)发送一个确认(ack)或否定确认(nack)来告知消息是否被成功处理。

  • ack(确认):当消息成功到达交换机时,RabbitMQ会向生产者发送一个ack,表示消息已经被正确处理。
  • nack(否定确认):如果RabbitMQ因为内部错误(如资源不足)而无法处理消息,它会发送一个nack给生产者。但需要注意的是,在标准的RabbitMQ行为中,如果消息因为路由键不匹配任何队列而被交换机丢弃,RabbitMQ不会发送nack,而是可能通过publisher return机制来处理。

Publisher Return(发送者回执)

Publisher Return是RabbitMQ提供的另一种机制,用于处理那些无法路由到任何队列的消息。当消息被交换机接收,但由于没有匹配的绑定(即没有队列订阅了消息所使用的路由键)而无法投递到任何队列时,RabbitMQ会将这个消息返回给生产者,并附带一个路由失败的原因。

  • 返回消息:消息本身会被返回给生产者,同时还会附带一个Basic.Return命令,其中包含路由失败的原因(如路由键未找到匹配的队列)。
  • 注意:这里重要的是要理解,publisher return并不与acknack直接相关。ack/nack是确认消息是否到达交换机的,而publisher return是处理那些已经到达交换机但无法被路由到任何队列的消息的。

如何实现

首先添加配置

1
2
3
4
5
6
7
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

配置这两方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置confirmCallback 判断是否到达交换机
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 自定义的数据
* @param ack 是否确认
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), cause);
}
}
});
// 设置ReturnCallback 处理无法路由到队列的消息
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});


}

消息持久化

在写代码过程中,交换机,队列和消息都是默认持久化的,不需要做额外的操作,这里为了更好的理解,将显示调用如何持久化,在实际生产过程中,一下步骤并不需要做

交换机持久化

1
2
3
4
5
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}

队列持久化

1
2
3
4
@Bean
public Queue simpleQueue(){
return new Queue("simple.queue",true);
}

消息持久化

1
2
3
4
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();

消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除,此种较为危险,如果消费者获取消息后,在处理消息的过程中发生异常,那么此时消息以及被删除,即消息丢失

配置方式

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack

在auto方法中,如果消费者消费消息时出现了异常,消息并不会丢失,而是会重新回到队列中再次发消息,知道消息能够接收成功,所以这里就需要搭配上消息重试机制,否则消息就会一直无限次被发往交换机,对服务器的压力会很大

失败重试机制

开启失败重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

在以上重试机制下,如果到达了最大重试次数,消息依然会丢失,所以我们要根据失败重试策略来处理

失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。使用死信交换机需要将队列上绑定deadLetterExchangedeadLetterRoutingKey,根据这里我们可以知道,他的效果与全局配置失败消息处理策略差不多,下面我将对比两者的优劣。

死信交换机的劣势和优势

劣势

  1. 配置复杂性:使用死信交换机需要在队列创建时额外配置dead-letter-exchangedead-letter-routing-key属性,增加了配置的复杂性。
  2. 性能开销:当消息被发送到死信交换机时,需要经过额外的路由和存储过程,可能会对系统性能造成一定影响。
  3. 管理复杂性:需要管理额外的死信队列和交换机,增加了系统的管理复杂性。

优势

  1. 灵活性:可以为不同的队列设置不同的死信交换机和死信队列,提供了更高的灵活性。
  2. 可追踪性:所有发送失败的消息都会被收集到死信队列中,便于追踪和定位问题。
  3. 可扩展性:可以根据需要增加死信队列的数量和容量,以应对更高的失败消息量。

配置消息失败处理策略的优势和劣势

劣势

  1. 全局性:该策略是全局性的,可能无法针对特定队列或消息类型进行细粒度的控制。
  2. 局限性:只能实现简单的重试逻辑,无法像死信交换机那样提供多种失败消息处理策略。
  3. 缺乏灵活性:无法根据消息的失败原因或类型来动态选择处理策略。

优势

  1. 配置简单:只需在全局范围内配置一次,即可应用于所有相关的消息处理。
  2. 易于实现:通过实现MessageRecovery接口,可以轻松地定义失败消息的重试逻辑。
  3. 性能影响小:由于不需要额外的队列和交换机,对系统性能的影响相对较小。

综合比较

死信交换机 消息失败处理策略
配置复杂性 高(需要额外配置死信队列和交换机) 低(全局配置)
性能开销 较高(涉及额外的路由和存储) 较低(仅涉及重试逻辑)
管理复杂性 高(需要管理额外的队列和交换机) 低(全局管理)
灵活性 高(可以针对特定队列或消息类型设置) 低(全局性策略)
可追踪性 高(所有失败消息都被收集到死信队列中) 较低(依赖于日志或其他追踪机制)
可扩展性 高(可以增加死信队列数量和容量) 较低(受限于全局配置)

延迟消息

通过死信交换机我们可以达到延迟队列的效果,只需要为需要延迟发放消息的队列设置过期之间或者为消息设置过期时间,并且不给这个队列绑定消费者,到时间过期后,消息就会进入死信交换机,我们将要处理消息的消费者绑定到死信队列上,就可以达到延迟消息的功能,不过rabbitMQ现在推出的有专门做延迟消息的方法,所以这种方法不经常使用

使用 RabbitMQ 插件实现延迟队列

RabbitMQ 社区提供了一个非常方便的插件 rabbitmq-delayed-message-exchange,它允许你直接在交换机级别设置消息的延迟时间,而无需使用死信队列的复杂设置。使用这种方法,你可以:

  1. 安装插件:首先,你需要在 RabbitMQ 服务器上安装 rabbitmq-delayed-message-exchange 插件。
  2. 定义延迟交换机:在 RabbitMQ 管理界面或通过 API 定义一个类型为 x-delayed-message 的交换机。
  3. 发送延迟消息:当发送消息到该交换机时,你可以在消息的头部(header)中指定延迟时间(以毫秒为单位)。
  4. 绑定队列:像普通交换机一样,将队列绑定到这个延迟交换机上。
  5. 消费者监听:消费者监听这个队列,当消息的延迟时间到达后,消息将被发送到队列中,消费者可以像处理普通消息一样处理它。

惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

解决消息堆积有三种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 扩大队列容积,提高堆积上限
  • 在消费者内开启线程池加快消息处理速度

根据第二条思路,我们提出了一种解决办法就是引入惰性队列

引入惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

由于消息存入的时磁盘,所以消息可存储的量大大提高了,并且普通队列可能会因为内存中的消息过多而需要定期将部分消息写入磁盘以释放内存空间,这个过程称为page-out。而惰性队列由于直接写入磁盘,避免了这种间歇性的page-out操作,从而提高了系统的稳定性。

声明惰性队列

1
2
3
4
5
6
7
8
9
10
11
12
13
// 惰性队列
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}
// 普通队列
@Bean
public Queue normalQueue(){
return QueueBuilder.durable("normal.queue")
.build();
}

不过惰性队列也有缺陷,例如性能受限于磁盘IO,基于磁盘存储,消息时效性也会降低

集群

在面对高并发的问题中,我们通常会选择采用集群来解决,在RabbitMQ中的集群有两种模式

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

普通集群

优势

  1. 负载均衡:普通集群通过将队列分散到集群的各个节点上,实现了负载均衡,提高了整个集群的并发能力。这有助于优化资源使用,提高系统的响应速度和吞吐量。
  2. 高扩展性:随着业务需求的增长,普通集群可以通过添加更多的服务器节点来轻松扩展其性能和存储能力。这种横向扩展能力使得集群能够适应不断变化的需求。
  3. 高可靠性:虽然普通集群不直接提供数据冗余,但通过将队列分散到多个节点上,可以在一定程度上提高系统的可靠性。当某个节点出现故障时,其他节点仍然可以处理消息。

劣势

  1. 数据可用性风险:普通集群不保证数据的冗余存储,因此当队列所在的节点宕机时,队列中的消息可能会丢失。这增加了数据丢失的风险。
  2. 配置和维护复杂性:管理多个节点和确保它们之间的正确通信需要复杂的配置和维护工作。这可能会增加运维成本和难度。

镜像集群

优势

  1. 高可用性:镜像集群通过在不同节点上创建队列的镜像来提供高可用性。即使某个节点出现故障,其他节点上的镜像队列仍然可以继续处理消息,确保服务的连续性。
  2. 数据可靠性:镜像集群通过同步队列的元数据和消息到多个节点,提高了数据的可靠性。即使某个节点上的数据丢失,也可以从其他节点上的副本中恢复。

劣势

  1. 性能开销:由于需要在多个节点之间同步队列的元数据和消息,镜像集群可能会引入额外的性能开销。这可能会影响系统的响应速度和吞吐量。
  2. 无法线性扩容:镜像集群中的每个节点都包含整个集群的数据副本,这限制了集群的线性扩容能力。当单个节点的存储容量达到上限时,可能需要采取其他措施(如增加单个节点的存储容量或重新设计集群架构)来扩展集群。

仲裁队列

优势

  1. 高一致性:仲裁队列基于Raft共识算法或其变种,确保了数据在多个节点之间的一致性。这种一致性是强一致的,有助于减少数据丢失的风险。
  2. 非阻塞复制:仲裁队列在复制数据时不会阻塞队列的其他操作,这有助于提高系统的可用性和性能。即使某个节点出现故障并重新上线,主副本也可以从从副本中断的地方开始复制消息,而无需同步整个队列的数据。

劣势

  1. 资源消耗:仲裁队列需要消耗更多的内存和磁盘资源来存储数据的多个副本。这可能会增加系统的运行成本。
  2. 适用场景限制:仲裁队列适用于对队列容错和数据安全要求高、对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景中,仲裁队列的写入放大会造成成倍的磁盘占用,因此可能不适合使用。