# 死信队列
# 概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信有死信自然就有了死信队列。
# 应用场景
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中
比如说
- 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
# 死信的来源
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
# 代码
# 消费者 c1
import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者
*/
@Slf4j
public class C1 {
/**
* 普通交换机
*/
private static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 普通队列
*/
private static final String NORMAL_QUEUE = "normal_queue";
/**
* 普通队列路由
*/
private static final String NORMAL_ROUTING_KEY = "zhangsan";
/**
* 死信队列
*/
private static final String DEAD_QUEUE = "dead_queue";
/**
* 死信路由
*/
private static final String DEAD_ROUTING_KEY = "lisi";
/**
* 死信交换机
*/
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//声明普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
//普通队列
Map<String, Object> arguments = new HashMap<>();
/**
* 过期时间,单位毫秒,这里消费者不设置,通常是生产者设置,生产者可以随意设置过期时间更为灵活
* 消费者设置了不可更改
*/
//arguments.put("x-message-ttl",100000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信routingkey
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
//队列最大长度,超过最大长度直接发送到死信队列
//arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
DeliverCallback deliverCallback = (consumerTag, message)->{
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
if (msg.contains("5")) {
//拒绝
//false 标识不放回队列
//true 放回队列
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
log.info("consumer-1,接收到的消息: {}",msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
CancelCallback cancelCallback = (consumerTag)->{
log.info("取消的消息: {}",consumerTag);
};
//假设这个普通消费者宕机了,那么就会按照上面的配置,进入死信交换机
//channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
log.info("consumer-1,等待接收消息");
}
}
# 消费者 c2(死信)
import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者 消费死信消息
*/
@Slf4j
public class C2 {
/**
* 死信队列
*/
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message)->{
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("死信消费者监听消息 = " + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag)->{
log.info("取消的消息: {}",consumerTag);
};
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
log.info("consumer-2,等待接收消息");
}
}
# 生产者
import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
/**
* 生产者
*/
public class Producer {
/**
* 普通交换机
*/
private static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 普通队列路由
*/
private static final String NORMAL_ROUTING_KEY = "zhangsan";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//死信消息,设置 TTL 时间,单位是毫秒
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder()
//十秒
.expiration("300000")
.build();
for (int i = 0; i < 10; i++) {
String message = "消息 = " + i;
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生产者发送完消息");
channel.close();
}
}
← topic 主题交换机 延迟队列 →