Skip to content

消息 - Confirm

消息的Confirm其实就是一个确认机制,指的是生产者投递消息之后,消息到达了broker内的交换机上则会给生产者一个应答,生产者根据应答判断消息是否正常的发送到了broker内的交换机上,这确保消息可靠性投递的保障

image-20240814164453721

yml配置:

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启生产者确认机制

# 发布者类型确认使用  - publisher-confirm-type 值
# public enum ConfirmType {
#     在范围操作内使用RabbitTemplate#waitForConfirms() (或waitForConfirmsOrDie() 。
#     SIMPLE,
# 	  与CorrelationData一起使用以将确认与已发送的消息关联起来。
#     CORRELATED,
# 	  发布者确认被禁用(默认)
#     NONE
# }

写一个类实现 RabbitTemplate.ConfirmCallback 接口,判断成功失败 ack 结果,如果 ack 为 false 那么对消息进行重新投递或者其他操作

java
@Slf4j
@Component
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 回调方法,确认消息成功或者失败
     *
     * @param correlationData 回调的相关数据。
     * @param ack             true 表示 ack, false 表示 nack
     * @param cause           可选原因,适用于 nack(如果可用),否则为 null。
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送成功:{}", correlationData.getId());
            return;
        }
        // 执行消息发送失败的逻辑
        log.info("消息发送失败:{}", cause);
    }
}

在发送消息页面进行配置关联数据和初始化内容

java
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyConfirmCallBack confirmCallBack;

@PostConstruct  // 初始化
public void init() {
    rabbitTemplate.setConfirmCallback(confirmCallBack);
}

@GetMapping("/message") // 发送消息
public void sendMessage(String msg) {
    {
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
        CorrelationData correlationData = new CorrelationData();  // 关联数据
        // 这个setId就是设置唯一的id,因为发送的消息可能一样,那就需要设置一个唯一id来跟踪提供者发送的消息
        correlationData.setId("order_1");
        // 采用直连交换机
        rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_NAME, RabbitConfig.NORMAL_BINDING_NAME, message, correlationData);
        log.info("发送消息成功:{}", msg);
    }
}

除了单独创建一个类实现接口之外,还可以有以下的方式:

  1. 使用匿名内部类
  2. 让当前类实现接口
  3. 使用Lambda表达式(因为是函数时接口)

消息 - Return

RabbitMQ消息的传递流程为:producer —> exchange —> queue —> consumer

消息从 producer —> exchange 会返回一个 ConfirmCallback

消息从 exchange —> queue 投递失败会返回一个 ReturnCallback

可以通过这两个 callback 控制消息的可靠传递

开启配置:

yaml
spring:
  rabbitmq:
    publisher-returns: true # 开启生产者返回机制

实现接口

java
@Slf4j
@Component
public class MyReturnCollBack implements RabbitTemplate.ReturnsCallback {
    @Override // 该接口个上面的接口不同的是该接口是在发送消息失败的时候会调用
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息发送失败 - 交换机:{}", returned.getExchange());
        log.error("消息发送失败 - 路由KEY:{}", returned.getRoutingKey());
        log.error("消息发送失败 - 原因:{}", returned.getMessage());
    }
}

配置类

java
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyReturnCollBack returnCollBack;

@PostConstruct  // 初始化
public void init() {
    rabbitTemplate.setReturnsCallback(returnCollBack); // 设置回调函数
}

@GetMapping("/message") // 发送消息
public void sendMessage(String msg) {
    {
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
        CorrelationData correlationData = new CorrelationData();  // 关联数据
        correlationData.setId("order_1");
        rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_NAME, RabbitConfig.NORMAL_BINDING_NAME + "1", message, correlationData);
        log.info("发送消息成功:{}", msg);
    }
}

备用交换机

当消息发送失败时会走备用交换机,它和死信队列有点相像,不同的是死信队列主要用于处理消费端的问题,比如消息处理失败;而备用交换机则用于处理生产端的问题,比如消息无法被预期的交换机路由

image-20240815105645339

配置:

java
public static final String NORMAL_EXCHANGE_NAME = "exchange.order"; // 正常交换机
public static final String NORMAL_QUEUE_NAME = "queue.order"; // 正常队列
public static final String NORMAL_BINDING_NAME = "order"; // 正常绑定
public static final String SPARE_EXCHANGE_NAME = NORMAL_EXCHANGE_NAME + ".spare"; // 备用交换机
public static final String SPARE_QUEUE_NAME = NORMAL_QUEUE_NAME + ".spare"; // 备用队列

@Bean   // 正常交换机
public DirectExchange directExchange() {
    return ExchangeBuilder
            .directExchange(NORMAL_EXCHANGE_NAME)
            .alternate(SPARE_EXCHANGE_NAME)  // 绑定备用交换机
            .build();
}

@Bean   // 正常队列
public Queue orderQueue() {
    return QueueBuilder.durable(NORMAL_QUEUE_NAME).build();
}

@Bean   // 正常绑定
public Binding orderBinding(DirectExchange directExchange, Queue orderQueue) {
    return BindingBuilder.bind(orderQueue).to(directExchange).with(NORMAL_BINDING_NAME);
}

@Bean   // 备用交换机
public FanoutExchange spareExchange() {
    return ExchangeBuilder.fanoutExchange(SPARE_EXCHANGE_NAME).build();
}

@Bean   // 备用队列
public Queue spareQueue() {
    return QueueBuilder.durable(SPARE_QUEUE_NAME).build();
}

@Bean   // 备用队列绑定
public Binding spareBinding(FanoutExchange spareExchange, Queue spareQueue) {
    return BindingBuilder.bind(spareQueue).to(spareExchange);
}

详细属性

交换机

name —— 交换机名称

type —— 交换机类型(扇形、直连、主题、头部)

durability —— 是否持久化(代表MQ重启后该交换机是否还会存在)

auto delete —— 是否自动删除(曾经由队列绑定该交换机,后来解绑了,该交换机没有队列绑定那么就会自动删除)

internal —— 是否内部使用(如果为true客户端无法直接发送消息到该交换机,他只能用于交换机与交换机的绑定)

参数 Arguments

alternate-exchange —— 备用交换机

队列

name —— 队列名称

type —— 队列类型

durability —— 是否持久化(代表MQ重启后该队列是否还会存在)

auto delete —— 是否自动删除(跟交换机自动删除差不错,当没有代码监听这个队列的时候该队列就会被删除)

exclusive —— 排他队列(只对首次生命它的连接可见,并且在断开连接时自动删除,一番设置为false)

参数 Arguments

x-expires —— 当该队列在指定时间未被访问将自动删除

x-message-ttl —— 队列过期时间(单位毫秒)

x-overflow —— 设置队列溢出行为,当达到队列最大长度时消息或做什么操作(有效值Drop Head、Reject Publish)

x-single-active-consumer —— 单一消费者,在有三个消费者的情况下所发送的消息轮询的,设置单一消费者之后,不在轮询只往一个消费者发送消息

x-dead-letter-exchange —— 死信交换机

x-dead-letter-routing-key —— 死信路由KEY

x-max-length —— 队列所存消息最大数

x-max-length-bytes —— 队列所存最大字节数,跟上面最大数相似

消息可靠性投递

消息的可靠性投递是要保证消息的投递期间的每一个环节都不会出错,这样必然会损失一些性能

image-20240815152852271

①代表消息从生产者到交换机 - confirm模式

②代表消息从交换机路由到队列 - return模式\备用交换机

③代表消息在队列中存储 - 队列、交换机、消息持久化

④代表消费者监听消息

消息的幂等性

消息消费时的幂等性(消息不会被重复消费)

幂等性:对于一个资源,无论请求一次还是多次,对资源本身影响造成的影响时相同的,不会因为重复的请求而对该资源重复造成影响

解决非幂等性的方法:通过给消息设置唯一ID + Redis即可解决