# 死信队列

# 概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信有死信自然就有了死信队列。

# 应用场景

为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中

比如说

  1. 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

# 死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

# 代码

死信

# 消费者 c1


import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;


/**
 * 消费者
 */
@Slf4j
public class C1 {
    /**
     * 普通交换机
     */
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    /**
     * 普通队列
     */
    private static final String NORMAL_QUEUE = "normal_queue";
    /**
     * 普通队列路由
     */
    private static final String NORMAL_ROUTING_KEY = "zhangsan";

    /**
     * 死信队列
     */
    private static final String DEAD_QUEUE = "dead_queue";
    /**
     * 死信路由
     */
    private static final String DEAD_ROUTING_KEY = "lisi";
    /**
     * 死信交换机
     */
    private static final String DEAD_EXCHANGE = "dead_exchange";



    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        //普通队列
        Map<String, Object> arguments = new HashMap<>();
        /**
         * 过期时间,单位毫秒,这里消费者不设置,通常是生产者设置,生产者可以随意设置过期时间更为灵活
         * 消费者设置了不可更改
         */
        //arguments.put("x-message-ttl",100000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信routingkey
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        //队列最大长度,超过最大长度直接发送到死信队列
        //arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        DeliverCallback deliverCallback = (consumerTag, message)->{
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            if (msg.contains("5")) {
                //拒绝
                //false 标识不放回队列
                //true 放回队列
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            }else {
                log.info("consumer-1,接收到的消息: {}",msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };

        CancelCallback cancelCallback = (consumerTag)->{
            log.info("取消的消息: {}",consumerTag);
        };

        //假设这个普通消费者宕机了,那么就会按照上面的配置,进入死信交换机
        //channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);

        log.info("consumer-1,等待接收消息");
    }
}

# 消费者 c2(死信)

import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;


/**
 * 消费者 消费死信消息
 */
@Slf4j
public class C2 {
    /**
     * 死信队列
     */
    private static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (consumerTag, message)->{
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("死信消费者监听消息 = " + msg);
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = (consumerTag)->{
            log.info("取消的消息: {}",consumerTag);
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
        log.info("consumer-2,等待接收消息");

    }



}

# 生产者


import com.example.mq_s.utils.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;

/**
 * 生产者
 */
public class Producer {
    /**
     * 普通交换机
     */
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    /**
     * 普通队列路由
     */
    private static final String NORMAL_ROUTING_KEY = "zhangsan";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //死信消息,设置 TTL 时间,单位是毫秒
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                        .builder()
                        //十秒
                        .expiration("300000")
                        .build();

        for (int i = 0; i < 10; i++) {
            String message = "消息 = " + i;
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, message.getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("生产者发送完消息");
        channel.close();

    }

}