# 发布确认

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。

于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

错误

应 用 [xxx] 在 [08-1516:36:04] 发 生 [ 错误日志异常 ] , alertId=[xxx] 。 由 [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] 触发。 应用 xxx 可能原因如下 服务名为: 异常为: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 产 生 原 因 如 下 :1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.||Consumer received fatal=false exception on startup:

# 确认机制方案

确认机制

# 代码架构

发布确认代码架构

# 代码

# 配置

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE:经测试有两种效果
    • 其一效果和 CORRELATED 值一样会触发回调方法
    • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
spring:
  rabbitmq:
    publisher-confirm-type: correlated

# 配置类

@Configuration
public class RabbitConfirmConfig {

    public static final String CONFIRM_ACK_EXCHANGE = "confirm.exchange";
    public static final String CONFIRM_ACK_QUEUE = "confirm.queue";
    public static final String CONFIRM_ACK_ROUTING_KEY = "key1";


    @Bean(CONFIRM_ACK_EXCHANGE)
    public DirectExchange confirm() {
        return new DirectExchange(CONFIRM_ACK_EXCHANGE);
    }

    @Bean(CONFIRM_ACK_QUEUE)
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_ACK_QUEUE).build();
    }

    @Bean
    public Binding ackEToACkQ(@Qualifier(CONFIRM_ACK_EXCHANGE) DirectExchange ackE,
                              @Qualifier(CONFIRM_ACK_QUEUE) Queue queue) {
        return BindingBuilder.bind(queue).to(ackE).with(CONFIRM_ACK_ROUTING_KEY);
    }
}

# 生产者


import com.example.mq_s.config.RabbitConfirmConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("ack")
public class ConfirmController implements RabbitTemplate.ConfirmCallback {

    @PostConstruct
    public void setConfirmBack() {
        //注入确认回调
        rabbitTemplate.setConfirmCallback(this);
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/publish")
    public String publishMessageAck(String msg) {
        rabbitTemplate.convertAndSend(RabbitConfirmConfig.CONFIRM_ACK_EXCHANGE,
                RabbitConfirmConfig.CONFIRM_ACK_ROUTING_KEY,
                msg,
                new CorrelationData("123")
        );

        return "yes";
    }

    /**
     * 回调方法,无论是否成功失败都回调
     *
     * @param correlationData 回调消息id 和相关信息(correlationData 是在发消息自己填写的,如果不写就是空的)
     * @param ack             是否成功
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("生产者发布消息成功");
        } else {
            System.out.println("生产者发布消息失败");
        }
    }
}

# 消费者跟以前一样只是监听打印一下