Skip to content

TTL消息(过期消息)

过期消息也叫TTL(Time To Live)消息。消息的过期时间有两种设置方式:单条消息过期、通过队列属性设置消息过期

过期的消息叫死消息、死信

单条消息过期

单条消息过期参考:

java
// 正常 new
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); // 设置15秒过期时间 单位毫秒
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties)
    
// 建造者模式
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000"); // 设置过期时间 单位毫秒
Message message = MessageBuilder
        .withBody(msg.getBytes(StandardCharsets.UTF_8))
        .andProperties(messageProperties) // 消息属性
        .build();

单条消息设置过期时间决定了在没有消费者消费消息的时候,这条消息可以存活多长时间。在以上案例中这条消息如果没有在15秒内被消费它将被删除

队列消息过期

队列消息过期参考:

java
// 正常 new
Map<String, Object> properties = new HashMap<>();
properties.put("x-message-ttl", 10000); // 设置队列消息过期时间
// 参数:队列名 - 是否持久化 - 是否排他队列 - 是否自动删除 - 参数
Queue queue = new Queue("queue.a", true, false, false, properties);

// 建造者模式
Map<String, Object> properties = new HashMap<>();
properties.put("x-message-ttl", 10000); // 设置队列消息过期时间
Queue queue = QueueBuilder
    .durable("queue.a")
    .withArguments(properties)
    .build();

队列过期时间决定了在没有任何消费者的情况下队列内的消息能够存活多长时间

如果队列消息过期和单条消息过期同时设置,那么消息的TTL以两者之间时间较小的时间为准

DLX 死信队列

DLX:Dead Letter Exchange;死信队列也叫死信交换机、死信邮箱等

image-20240810182003650

在消息过期、队列过期、队列消息达到最大长度、消费者拒绝消息不进行重新投递、消费者拒绝消息这几个情况就会进入死信交换机

java
@Configuration
public class RabbitConfig {
    private static final String NORMAL_EXCHANGE_NAME = "exchange.order"; // 正常交换机
    private static final String NORMAL_QUEUE_NAME = "queue.order"; // 正常队列
    private static final String NORMAL_BINDING_NAME = "order"; // 正常绑定
    private static final String DLX_EXCHANGE_NAME = "exchange.order.dlx"; // 死信交换机
    private static final String DLX_QUEUE_NAME = "queue.order.dlx"; // 死信队列
    private static final String DLX_BINDING_NAME = "order.dlx"; // 死信绑定

    @Bean   // 正常交换机
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(NORMAL_EXCHANGE_NAME).build();
    }

    @Bean   // 正常队列
    public Queue orderQueue() {
        return QueueBuilder
                .durable(NORMAL_QUEUE_NAME)
                .withArgument("x-message-ttl", 20000)  // 设置队列过期时间(毫秒)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)  // 设置队列的死信交换机
                .withArgument("x-dead-letter-routing-key", DLX_BINDING_NAME)  // 设置死信队列的路由键
                .build();
    }

    @Bean   // 正常绑定
    public Binding orderBinding(DirectExchange directExchange, Queue orderQueue) {
        return BindingBuilder.bind(orderQueue).to(directExchange).with(NORMAL_BINDING_NAME);
    }

    @Bean   // 死信交换机
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(DLX_EXCHANGE_NAME).build();
    }

    @Bean   // 死信队列
    public Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    }

    @Bean   // 死信绑定
    public Binding dlxBinding(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_BINDING_NAME);
    }
}

死信交换机本质上就是一个被赋予了特殊意义的普通的交换机

在以上的配置中普通交换机就是一个生产者,队列中过期的消息都会被送到死信交换机中而促使他们相互联系的关键就是20和21行的配置,这两行分别配置了死信交换机的名字和死信队列的路由key,确保过期的消息能够进入队列

除了20、21的写法也可以创建一个Map集合编写配置或者使用以下方式

java
return QueueBuilder
        .durable(NORMAL_QUEUE_NAME)
        .ttl(20000)  // 设置队列过期时间(毫秒)
        .deadLetterExchange(DLX_EXCHANGE_NAME)  // 设置队列的死信交换机
        .deadLetterRoutingKey(DLX_BINDING_NAME)  // 设置死信队列的路由键
        .build();

以上的内容为队列消息过期配置,单条消息过期夜差不多

队列达到最大长度

当队列达到最大长度的时候,此时再有一条消息入队时最先入队的消息会被发送到DLX也就是先进先出

java
return QueueBuilder
        .durable(NORMAL_QUEUE_NAME)
        .withArgument("x-message-ttl", 20000)  // 设置队列过期时间(毫秒)
        .maxLength(1000)  // 设置队列最大长度
        .withArgument("x-max-length", 1000)  // 设置队列最大长度
        .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)  // 设置队列的死信交换机
        .withArgument("x-dead-letter-routing-key", DLX_BINDING_NAME)  // 设置死信队列的路由键
        .build();

4行和5行要其中一个即可

消费者拒绝消息不进行重新投递

从正常队列接受消息,但是不对消息进行确认,并且不对消息进行重新投递,此时这条消息就进入的死信队列

首先需要再 application.yml 中配置为手动确认消息:

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 开启消费者手动确认

示例:

java
// 监听某个队列的方法
public void receiveMessage(Message msg, Channel channel) {
    MessageProperties messageProperties = msg.getMessageProperties();  // 获取消息的属性
    long deliveryTag = messageProperties.getDeliveryTag();  // 获取消息的唯一标识

    try {
        // 处理业务逻辑
        log.info("正常消息:{}", new String(msg.getBody(), StandardCharsets.UTF_8));
        /*
         * 如果一切正常,手动应答告知服务器删除消息
         * 参数:消息唯一标识,是否批量应答
         * 批量答应 - true - 该消息前面没有确认的消息都会被确认
         * 批量答应 - false - 只确认当前消息
         * */
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        log.error("异常消息:{}", new String(msg.getBody(), StandardCharsets.UTF_8));
        try {
            /*
             * 如果业务出现了问题,那么当前消息就不能被删除
             * 参数:消息唯一标识,是否批量应答,是否重新入队
             * 批量答应 - true - 该消息前面没有确认的消息都会被确认
             * 批量答应 - false - 只确认当前消息
             * 重新入队 - true - 该消息会被重新放入队列
             * 重新入队 - false - 消息会被丢弃,如果存在死信队列那么会放入死信队列
             * */
            channel.basicNack(deliveryTag, false, false);
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        throw new RuntimeException(e);
    }
}

在以上情况当业务出现了问题然后拒绝确认消息并拒绝重新入队那么在没有配置死信队列的情况下该消息会被丢弃

消费者拒绝消息

就是更改上面的代码中 catch 内的的拒绝方法

java
catch (Exception e) {
    log.error("异常消息:{}", new String(msg.getBody(), StandardCharsets.UTF_8));
    try {
        /*
         * 如果业务出现了问题,那么当前消息就不能被删除
         * 参数:消息唯一标识,是否重新入队
         * 重新入队 - true - 该消息会被重新放入队列
         * 重新入队 - false - 消息会被丢弃,如果存在死信队列那么会放入死信队列
         * */
        channel.basicReject(deliveryTag, false);
    } catch (IOException ex) {
        throw new RuntimeException(ex);
    }
    throw new RuntimeException(e);
}

basicNack 跟上面的 basicReject 区别就在于第二个参数,是否批量处理

延迟队列

延迟队列有多种实现方式:

  1. 定时任务方式
    • 优点:简单、易实现
    • 缺点:延时时间不准确(如果5分钟扫描一次,那么就可能存在5分钟延迟)、如果数据量大那么性能较差
  2. JDK延迟队列(只适用于单体应用)
    • DelayedQueue - 无阻塞队列,该队列只有在延迟期满之后再能从中获取元素
    • 优点:简单、任务延迟低
    • 缺点:重启服务数据会丢失、不适合集群、数据量大可能会发生OOM(内存不足)异常
  3. 使用消息中间件(RabbitMQ)
    • RabbitMQ本身不支持延迟队列,但是使用TTL结合DLX的方式就可以实现延迟队列
    • 流程跟死信队列的流程图差不多

消息中间件(RabbitMQ)

在上面的死信队列中配置类可以不配置两个交换机,一个交换机也可以正常使用

image-20240811210654645

java
public static final String NORMAL_EXCHANGE_NAME = "exchange.order"; // 正常交换机
public static final String NORMAL_QUEUE_NAME = "queue.order"; // 正常队列
public static final String NORMAL_BINDING_NAME = "order"; // 正常绑定
public static final String DLX_QUEUE_NAME = "queue.order.dlx"; // 死信队列
public static final String DLX_BINDING_NAME = "order.dlx"; // 死信绑定

@Bean   // 正常交换机
public DirectExchange directExchange() {
    return ExchangeBuilder.directExchange(NORMAL_EXCHANGE_NAME).build();
}

@Bean   // 正常队列
public Queue orderQueue() {
    return QueueBuilder
            .durable(NORMAL_QUEUE_NAME)
            .ttl(15000)  // 设置队列的TTL(超时时间)(毫秒)
            .deadLetterExchange(NORMAL_EXCHANGE_NAME)  // 设置死信消息去的交换机
            .deadLetterRoutingKey(DLX_BINDING_NAME)  // 设置死信队列的路由键
            .build();
}

@Bean   // 正常绑定
public Binding orderBinding(DirectExchange directExchange, Queue orderQueue) {
    return BindingBuilder.bind(orderQueue).to(directExchange).with(NORMAL_BINDING_NAME);
}

@Bean   // 死信队列
public Queue dlxQueue() {
    return QueueBuilder.durable(DLX_QUEUE_NAME).build();
}

@Bean   // 死信绑定
public Binding dlxBinding(DirectExchange directExchange, Queue dlxQueue) {
    return BindingBuilder.bind(dlxQueue).to(directExchange).with(DLX_BINDING_NAME);
}

过期时间问题

image-20240812211224086

上图是一个队列,队列遵循先进先出的原则。假设第五格有一条消息过期时间20s,第四格也有一条消息过期时间5s,那么遵循先进先出的原则,如果第五格的消息没有被消费那么第四格的消息就永远不会被消费,那么如果第五格在15s才被消费那么第四格的数据早已死信

解决方法:让过期时间相同的消息进入同一个队列

image-20240812212001635

延迟插件

首先得去 https://www.rabbitmq.com/community-plugins 网站下载 rabbitmq_delayed_message_exchange 插件

注意:需要跟安装的RabbitMQ版本相对应

Docker安装

sh
# 首先需要把文件下载到本地然后传入服务器上

# 把插件复制到容器内
docker cp /tools/rabbitmq_delayed_message_exchange-3.13.0.ez [容器名|ID]:/plugins

# 启用插件
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 看到下方内容方为安装完成
Enabling plugins on node rabbit@b6e0c776fe17:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@b6e0c776fe17...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

如果安装失败爆出以下错误

sh
** (CaseClauseError) no case clause matching: {:badrpc, {:EXIT, {:aborted, {:node_not_running, :rabbit@ed7e13947dbd}}}}
    (rabbitmqctl 3.13.0-dev) lib/rabbitmq/cli/plugins/plugins_helpers.ex:217: RabbitMQ.CLI.Plugins.Helpers.update_enabled_plugins/2
    (rabbitmqctl 3.13.0-dev) lib/rabbitmq/cli/plugins/plugins_helpers.ex:112: RabbitMQ.CLI.Plugins.Helpers.update_enabled_plugins/4
    (rabbitmqctl 3.13.0-dev) lib/rabbitmq/cli/plugins/commands/enable_command.ex:127: anonymous fn/6 in RabbitMQ.CLI.Plugins.Commands.EnableCommand.do_run/2
    (elixir 1.17.2) lib/stream.ex:1593: anonymous fn/2 in Stream.iterate/2
    (elixir 1.17.2) lib/stream.ex:1820: Stream.do_unfold/4
    (elixir 1.17.2) lib/stream.ex:1891: Enumerable.Stream.do_each/4
    (elixir 1.17.2) lib/stream.ex:1052: Stream.do_transform_inner_enum/7
    (elixir 1.17.2) lib/stream.ex:1891: Enumerable.Stream.do_each/4

{:case_clause, {:badrpc, {:EXIT, {:aborted, {:node_not_running, :rabbit@ed7e13947dbd}}}}}

在3.13.6的版本中是因为在WEB界面中Admin->Feature Flags开启了 khepri_db 选项

image-20240814085115336

应用

image-20240814155736176

消息发送之后不会直接投递到队列,而是存储到了Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部)

解决了消息过期时间不一致所出现的问题

延迟插件需要在 RabbitMQ 3.5.7 及以上才支持,依赖 Erlang/OPT 18.0 及以上运行环境

Mnesia 是一个小型数据库,不太适合存储大量的延迟消息

配置:

java
public static final String NORMAL_EXCHANGE_NAME = "exchange.order"; // 交换机
public static final String NORMAL_QUEUE_NAME = "queue.order"; // 队列
public static final String NORMAL_BINDING_NAME = "plugin"; // 绑定

@Bean   // 自定义交换机
public CustomExchange customExchange() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-delayed-type", "direct"); // 设置参数*
    /*
     * 自定义交换机
     * 参数:交换机名称 - 交换机类型 - 是否持久化 - 是否自动删除 - 其他参数
     * */
    return new CustomExchange(NORMAL_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}

@Bean   // 队列
public Queue orderQueue() {
    return QueueBuilder.durable(NORMAL_QUEUE_NAME).build();
}

@Bean   // 绑定
public Binding orderBinding(CustomExchange customExchange, Queue orderQueue) {
    return BindingBuilder.bind(orderQueue).to(customExchange).with(NORMAL_BINDING_NAME).noargs();
}

案例:

java
@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/message") // 发送消息
public void sendMessage(String msg) {
    {
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay", "15000") // 不再使用 ttl 设置过期时间
                .build();
        rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_NAME, RabbitConfig.NORMAL_BINDING_NAME, message);
        log.info("发送消息成功:{}", msg);
    }
    {
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay", "10000")
                .build();
        rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_NAME, RabbitConfig.NORMAL_BINDING_NAME, message);
        log.info("发送消息成功:{}", msg);
    }
}

// 监听正常队列
@RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_NAME)
public void receiveMessage(Message msg) {
    log.info("正常消息:{}", new String(msg.getBody(), StandardCharsets.UTF_8));
}