day07-MQ 高级

YangeIT大约 41 分钟高级服务框架消息可靠性死信交换机惰性队列MQ集群

day07-MQ 高级

目标

  • 消息可靠性
    • 能实现RabbitMQ的生产者重试机制 🍐
    • 能实现RabbitMQ的生产者确认机制 ❤️
    • 能实现RabbitMQ的持久化 🍐
    • 能实现RabbitMQ的消费失败重试❤️
    • 能使用RabbitMQ的各种消费失败重试策略❤️
    • 能理解业务幂等性
  • 消息延迟发送
    • 能理解死信交换机的作用和延迟消息❤️
    • 能实现RabbitMQ的DelayExchange实现延迟队列
    • 超时订单问题

消息队列在使用过程中,面临着很多实际问题需要思考:

image-20210718155003157
image-20210718155003157
  1. 可靠性:

1.发送者的可靠性

发送者的可靠性

首先,我们一起分析一下消息丢失的可能性有哪些。

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的: image 消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:
    • 生产者发送消息时连接 MQ 失败
    • 生产者发送消息到达 MQ 后未找到 Exchange
    • 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue
    • 消息到达 MQ 后,处理消息的进程发生异常
  • MQ 导致消息丢失:
    • 消息到达 MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证 MQ 的可靠性,就必须从 3 个方面入手:

  • 确保生产者一定把消息发送到 MQ
  • 确保 MQ 不会将消息弄丢
  • 确保消费者一定要处理消息

这一章我们先来看如何确保生产者一定能把消息发送到 MQ。

总结

课堂作业1

  1. 保证 MQ 的可靠性,主要从哪些方面入手?🎤

1.1.生产者重试机制

生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与 MQ 的连接中断。

为了解决这个问题,SpringAMQP 提供的消息发送时的重试机制。即:当 RabbitTemplate 与 MQ 连接超时后,多次重试。

修改 publisher 模块的 application.yaml 文件,添加下面的内容:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = multiplier * last-interval(上次等待时长)
        max-attempts: 4 # 最大重试次数

 
 
 
 
 
 
 
 

我们利用命令停掉 RabbitMQ 服务:

docker stop mq

然后测试发送一条消息,会发现会每隔 1 秒重试 1 次,总共重试了 4 次。消息发送的超时重试机制配置成功了!

image
image

注意 :当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(如:200ms)和重试次数(如2次),当然也可以考虑使用异步线程来执行发送消息的代码。

如使用hutool工具包中的异步工具:

总结

课堂作业2

  1. 生产者发送消息时,出现了网络故障,导致与 MQ 的连接中断,有什么解决方案,应该注意什么?🎤

1.2.生产者确认机制

生产者确认机制

一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到 MQ 之后丢失的现象,比如:

  • MQ 内部处理消息的进程发生了异常
  • 生产者发送消息到达 MQ 后未找到 Exchange
  • 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue,因此无法路由

针对上述情况,RabbitMQ 提供了生产者消息确认机制,包括 Publisher ConfirmPublisher Return 两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执

具体如图所示:

流程如下:

  • 当消息投递到 MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时返回 ack 的确认信息,代表投递成功
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
  • 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
  • 其它情况都会返回 NACK,告知投递失败

Consumer Acknowledgements 简称 ack 用户确认书

其中:

  • acknack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。
  • return 则属于 Publisher Return 机制。

总结

  • 当消息投递到 MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时返回 ack 的确认信息,代表投递成功。如:routingkey对不上
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
  • 持久消息open in new window投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
  • 其它情况都会返回 NACK,告知投递失败

Consumer Acknowledgements 简称 ack 用户确认书

其中:

  • acknack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。
  • return 则属于 Publisher Return 机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

课堂作业3

  1. 哪些业务场景需要用到生产者确认机制?🎤
  2. 临时消息和持久消息的区别?🎤
  3. Publisher Return机制是什么意思?

1.3.实现生产者确认

实现生产者确认

图解:

代码操作

1.开启生产者确认

publisher模块application.yaml 中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型   
    publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

  • none:关闭 confirm 机制
  • simple:同步阻塞等待 MQ 的回执
  • correlated:MQ 异步回调返回回执

一般我们推荐使用 correlated,回调机制。简单实用

注意

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型   
    publisher-returns: true # 开启publisher return机制  默认不开启,为false

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了

总结

课堂作业4

  1. 每个 RabbitTemplate 只能配置几个 ReturnCallback,一般在哪里配置? 主要用来处理什么业务
  2. 项目中ConfirmCallback 需要顶级几次,定义在哪?用来处理什么业务

2. MQ 的可靠性

消息到达 MQ 以后,如果 MQ 不能及时保存,也会导致消息丢失,所以 MQ 的可靠性也非常重要。 🎯

2.1.数据持久化

数据持久化

为了提升性能,默认情况下 MQ 的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
image
image

我们以控制台界面为例来说明。

操作

1.交换机持久化

在控制台的 Exchanges 页面,添加交换机时可以配置交换机的 Durability 参数:

设置为 Durable 就是持久化模式,Transient 就是临时模式。

  • 持久化模式: 持久化保存会dao在RabbitMQ宕机或者重启专后,未消费的消息仍然属存在,
  • 临时模式: 即时保存在RabbitMQ宕机或者重启后交换机会不存在
image
image

总结

课堂作业5

  1. 为了保证数据的可靠性,必须配置数据持久化,主要包括哪些?🎤
  2. 如果消息持久化,但是队列临时的话,消息能持久码?🎤

2.2.LazyQueue

LazyQueue

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为成为 PageOut. PageOut 会耗费一段时间,并且会阻塞队列进程。因此在这个过程中 RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的模式,也就是惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。

代码操作

基本步骤

  1. 控制台配置 Lazy 模式
  2. 代码配置 Lazy 模式
  3. 更新已有队列为 lazy 模式
  4. 对比测试

1.控制台配置 Lazy 模式

在添加队列的时候,添加 x-queue-mod=lazy 参数即可设置队列为 Lazy 模式:

总结

课堂作业6

  1. lazy模式有什么特点?🎤

3.消费者的可靠性

背景

当 RabbitMQ 向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常
  • ...

一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

但问题来了:RabbitMQ 如何得知消费者的处理状态呢?

本章我们就一起研究一下消费者处理消息时的可靠性解决方案。

3.1.消费者确认机制

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
image
image

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。

因此大多数情况下我们需要将消息处理的代码通过 try catch 机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.

由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送 ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果 简单好用

    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject;

返回 Reject 的常见异常有:

> Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
>
> - o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
> - o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
> - o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
> - o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
> - java.lang.NoSuchMethodException: Added in version 1.6.3.
> - java.lang.ClassCastException: Added in version 1.6.3.

代码操作

演示流程

  1. 演示none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。
  2. 演示auto:自动模式

通过下面的配置可以修改 SpringAMQP 的 ACK 处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

修改 consumer 服务的 SpringRabbitListener 类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

为了测试方便,可以直接在控制台发消息

测试可以发现:当消息处理发生异常时,消息依然被 RabbitMQ 删除了。 🎯

总结

课堂作业7

  1. auto模式有什么特性?🎤

3.2.失败重试机制

失败重试机制

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次 requeue 到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力:

image
image

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。

代码操作

修改 consumer服务的 application.yml 文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启 consumer 服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到 MQ 无限重新投递,而是在本地重试了 3 次
  • 本地重试 3 次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是 reject
image
image

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring 会返回 reject,消息会被丢弃

总结

image
image

课堂作业8

  1. 为什么要设置失败重试机制?🎤

3.3.失败处理策略

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由 MessageRecovery 接口来定义的,它有 3 个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是 RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

image
image

代码操作

1️⃣ 1)在 consumer 服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2️⃣ 2)定义一个 RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

package com.itheima.consumer.config;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

总结

课堂作业9

  1. 自定义重试次数耗尽后的消息处理策略,哪种策略能不丢失消息?转入人工处理队列?🎤

3.4.业务幂等性

业务幂等性

何为幂等性?

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据 id 删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ 消息的重复投递

我们在用户支付成功后会发送 MQ 消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。

举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息 ID
  • 业务状态判断

代码操作

1.唯一消息 ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的 id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一 ID 呢?

其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。

以 Jackson 的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

总结

课堂作业10

  1. 日志中有哪些信息?🎤

3.5.兜底方案

兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息 100% 的可靠。万一真的 MQ 通知失败该怎么办呢?

有没有其它兜底方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然 MQ 通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的 MQ 通知失败,我们依然能通过主动查询来保证订单状态的一致。

流程如下:

图中黄色线圈起来的部分就是 MQ 通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。

那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔 20 秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

定时任务大家之前学习过,具体的实现这里就不再赘述了。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用 MQ 消息通知交易服务,完成订单状态同步。
  • 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

4.延迟消息

延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为 30 分钟,则我们应该在用户下单后的第 30 分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第 30 分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。

在 RabbitMQ 中实现延迟消息也有两种方案:

  • 死信交换机 +TTL
  • 延迟消息插件

这一章我们就一起研究下这两种方案的实现方式,以及优缺点。

4.1.死信交换机和延迟消息

首先我们来学习一下基于死信交换机的延迟消息方案。

前言

4.1.1.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过 dead-letter-exchange 属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

4.1.2.延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的 RepublishMessageRecoverer 作用类似。

而最后一种场景,大家设想一下这样的场景:

如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是 ttl.queue 没有消费者监听,而是设定了死信交换机 hmall.direct,而队列 direct.queue1 则与死信交换机绑定,RoutingKey 是 blue:

假如我们现在发送一条消息到 ttl.fanout,RoutingKey 为 blue,并设置消息的有效期为 5000 毫秒:

消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5 秒之后,消息的有效期到期,成为死信:

死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blue

由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定, 也就能成功消费消息了。但此时已经是 5 秒钟以后了:

也就是说,publisher 发送了一条消息,但最终 consumer 在 5 秒后才收到消息。我们成功实现了延迟消息

代码操作

总结

课堂作业11

  1. 日志中有哪些信息?🎤

4.2.DelayExchange插件

DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件来实现相同的效果。

官方文档说明:

代码操作

4.2.1.下载

插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchangeopen in new window

由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:

当然,也可以直接使用课前资料提供好的插件:

注意

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息。

总结

课堂作业12

  1. 延迟消息插件的消息是存在哪里? 可以存很长的延迟时间吗?🎤

4.3.超时订单问题

超时订单问题

接下来,我们就在交易服务中利用延迟消息实现订单超时取消功能。其大概思路如下:

假如订单超时支付时间为 30 分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为 30 分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。

但是大多数情况下用户支付都会在 1 分钟内完成,我们发送的消息却要在 MQ 中停留 30 分钟,额外消耗了 MQ 的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第 30 分钟才检测。

例如:我们在用户下单后的第 10 秒、20 秒、30 秒、45 秒、60 秒、1 分 30 秒、2 分、...30 分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。

这样就可以有效避免对 MQ 资源的浪费了。

优化后的实现思路如下:

代码操作

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到 hm-common 模块下:

代码如下:

package com.hmall.common.domain;

import com.hmall.common.utils.CollUtils;
import lombok.Data;

import java.util.List;

@Data
public class MultiDelayMessage<T> {
    /**
     * 消息体
     */
    private T data;
    /**
     * 记录延迟时间的集合
     */
    private List<Long> delayMillis;

    public MultiDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }
    public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
        return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
    }

    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay(){
        return delayMillis.remove(0);
    }

    /**
     * 是否还有下一个延迟时间
     */
    public boolean hasNextDelay(){
        return !delayMillis.isEmpty();
    }
}

5.作业

5.1.取消订单

在处理超时未支付订单时,如果发现订单确实超时未支付,最终需要关闭该订单。

关闭订单需要完成两件事情:

  • 将订单状态修改为已关闭
  • 恢复订单中已经扣除的库存

这部分功能尚未实现。

大家要在 IOrderService 接口中定义 cancelOrder 方法:

void cancelOrder(Long orderId);

并且在 OrderServiceImpl 中实现该方法。实现过程中要注意业务幂等性判断。