# 发布确认
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。
于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:
错误
应 用 [xxx] 在 [08-1516:36:04] 发 生 [ 错误日志异常 ] , alertId=[xxx] 。 由 [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] 触发。 应用 xxx 可能原因如下 服务名为: 异常为: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 产 生 原 因 如 下 :1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.||Consumer received fatal=false exception on startup:
# 确认机制方案
# 代码架构
# 代码
# 配置
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果
- 其一效果和 CORRELATED 值一样会触发回调方法
- 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
spring:
rabbitmq:
publisher-confirm-type: correlated
# 配置类
@Configuration
public class RabbitConfirmConfig {
public static final String CONFIRM_ACK_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_ACK_QUEUE = "confirm.queue";
public static final String CONFIRM_ACK_ROUTING_KEY = "key1";
@Bean(CONFIRM_ACK_EXCHANGE)
public DirectExchange confirm() {
return new DirectExchange(CONFIRM_ACK_EXCHANGE);
}
@Bean(CONFIRM_ACK_QUEUE)
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_ACK_QUEUE).build();
}
@Bean
public Binding ackEToACkQ(@Qualifier(CONFIRM_ACK_EXCHANGE) DirectExchange ackE,
@Qualifier(CONFIRM_ACK_QUEUE) Queue queue) {
return BindingBuilder.bind(queue).to(ackE).with(CONFIRM_ACK_ROUTING_KEY);
}
}
# 生产者
import com.example.mq_s.config.RabbitConfirmConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@RestController
@RequestMapping("ack")
public class ConfirmController implements RabbitTemplate.ConfirmCallback {
@PostConstruct
public void setConfirmBack() {
//注入确认回调
rabbitTemplate.setConfirmCallback(this);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/publish")
public String publishMessageAck(String msg) {
rabbitTemplate.convertAndSend(RabbitConfirmConfig.CONFIRM_ACK_EXCHANGE,
RabbitConfirmConfig.CONFIRM_ACK_ROUTING_KEY,
msg,
new CorrelationData("123")
);
return "yes";
}
/**
* 回调方法,无论是否成功失败都回调
*
* @param correlationData 回调消息id 和相关信息(correlationData 是在发消息自己填写的,如果不写就是空的)
* @param ack 是否成功
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("生产者发布消息成功");
} else {
System.out.println("生产者发布消息失败");
}
}
}