# 延时队列

# 代码架构图

延时队列ttl

# 解释

# 交换机

  • x 普通交换机
  • y 死信交换机

# 队列

  • QA 普通
  • QB 普通
  • OD 死信

# 路由

  • XA 路由到 QA 队列
  • XB 路由到 QB 队列
  • YD 路由到 Y交换机在到 QD 死信队列

# 代码

# 配置类(路由,交换机,路由)


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQTTLConfig {

    public static final String X_NORMAL_DIRECT_EXCHANGE = "x";
    public static final String Y_DEAD_DIRECT_EXCHANGE = "y";

    public static final String QA_NORMAL_QUEUE = "qa";
    public static final String QD_DEAD_QUEUE = "qd";

    public static final String XA_NORMAL_ROUTING_KEY = "xa";
    public static final String YD_DAD_ROUTING_KEY = "yd";

    @Bean(X_NORMAL_DIRECT_EXCHANGE)
    public DirectExchange X() {
        return new DirectExchange(X_NORMAL_DIRECT_EXCHANGE);
    }

    @Bean(Y_DEAD_DIRECT_EXCHANGE)
    public DirectExchange Y() {
        return new DirectExchange(Y_DEAD_DIRECT_EXCHANGE);
    }

    @Bean(QA_NORMAL_QUEUE)
    public Queue qa() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_DIRECT_EXCHANGE);
        //路由
        arguments.put("x-dead-letter-routing-key", YD_DAD_ROUTING_KEY);
        //延时时间  ttl
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QA_NORMAL_QUEUE).withArguments(arguments).build();
    }

    @Bean(QD_DEAD_QUEUE)
    public Queue qd() {
        return QueueBuilder.durable(QD_DEAD_QUEUE).build();
    }

    @Bean
    public Binding qaBindingX(@Qualifier(QA_NORMAL_QUEUE) Queue qa,
                              @Qualifier(X_NORMAL_DIRECT_EXCHANGE) DirectExchange x) {
        return BindingBuilder.bind(qa).to(x).with(XA_NORMAL_ROUTING_KEY);
    }

    @Bean
    public Binding qdBindingY(@Qualifier(QD_DEAD_QUEUE) Queue qd,
                              @Qualifier(Y_DEAD_DIRECT_EXCHANGE) DirectExchange y) {
        return BindingBuilder.bind(qd).to(y).with(YD_DAD_ROUTING_KEY);
    }
}

# 生产者

import com.example.mq_s.config.RabbitMQTTLConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@RestController
@RequestMapping("/p")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping()
    public String sendMessage(String msg) {
        System.out.println("发送延时10秒消息到MQ");
        rabbitTemplate.convertAndSend(
                RabbitMQTTLConfig.X_NORMAL_DIRECT_EXCHANGE,
                RabbitMQTTLConfig.XA_NORMAL_ROUTING_KEY,
                msg
        );
        System.out.println("发送成功当前时间 = >  " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        return "{msg: 发送成功,code:200}";
    }
}

# 消费者

import com.example.mq_s.config.RabbitMQTTLConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;


@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = RabbitMQTTLConfig.QD_DEAD_QUEUE)
    public void listener(Message message, Channel channel) {
        System.out.println("监听到10秒延时消息");

        System.out.println("接收到的消息是 => " + new String(message.getBody(), StandardCharsets.UTF_8));

        System.out.println("当前时间是 => " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

    }
}

# 问题

如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

解决办法就是新增一个队列,不设置时间,由生产者发送消息时指定时间

生产者发送消息自己设置过期时间

警告

下面的代码虽然是生产者自己设置过期时间,但是有一个致命的问题就是,他是基于队列的

也就是先进先出,当第一个消息是10秒,第二个消息是2秒,那么就会先接收到10秒的,再是2秒的

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

解决办法,使用插件

# 代码

基于上面的改进

# 配置类,跟上面差不多,增加了一个不加上TTL的队列

同样的去除掉了,只显示新增加的


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQTTLConfig {

    public static final String X_NORMAL_DIRECT_EXCHANGE = "x";

    public static final String QC_NORMAL_QUEUE = "qc";

    public static final String XC_NORMAL_ROUTING_KEY = "xc";

    @Bean(X_NORMAL_DIRECT_EXCHANGE)
    public DirectExchange X() {
        return new DirectExchange(X_NORMAL_DIRECT_EXCHANGE);
    }

    @Bean(QC_NORMAL_QUEUE)
    public Queue qc() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_DIRECT_EXCHANGE);
        //路由
        arguments.put("x-dead-letter-routing-key", YD_DAD_ROUTING_KEY);
        return QueueBuilder.durable(QC_NORMAL_QUEUE).withArguments(arguments).build();
    }

    @Bean
    public Binding qcBindingX(@Qualifier(QC_NORMAL_QUEUE) Queue qc,
                              @Qualifier(X_NORMAL_DIRECT_EXCHANGE) DirectExchange x) {
        return BindingBuilder.bind(qc).to(x).with(XC_NORMAL_ROUTING_KEY);
    }

}

# 生产者(自己指定时间)

    @GetMapping("custom/ttl")
    public String sendMessageCustomTTL(String msg) {
        System.out.println("发送延时10秒消息到MQ");
        rabbitTemplate.convertAndSend(
                RabbitMQTTLConfig.X_NORMAL_DIRECT_EXCHANGE,
                RabbitMQTTLConfig.XC_NORMAL_ROUTING_KEY,
                msg,
                message -> {
                    //十五秒
                    message.getMessageProperties().setExpiration("15000");
                    return message;
                }
        );
        System.out.println("发送成功当前时间 = >  " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        return "{msg: 发送成功,code:200}";
    }

# 消费者还是跟上面一样

# 插件实现延时队列

  1. 下载插件(放在阿里云盘上了)
    官网地址 (opens new window)
    下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
    目录地址:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

  2. 安装

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

死信交换机插件安装

  1. 进入可视化界面 exchange ,如果安装成功就会显示下面的 交换机插件

# 代码架构图

代码架构图

# 代码

# 配置

注意

因为是基于插件的,所以交换机名称,路由,队列必须按照这个名字


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQTTLConfig {

    // 基于插件的交换机,必须这个名字
    public static final String PLUGIN_DELAYED_EXCHANGE = "delayed.exchange";
    //队列
    public static final String PLUGIN_DELAYED_QUEUE = "delayed.queue";
    //路由
    public static final String PLUGIN_DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean(PLUGIN_DELAYED_EXCHANGE)
    public CustomExchange pluginDelayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");

        /**
         * 1.交换机的名称
         * 2.交换机的类型
         * 3.是否需要持久化
         * 4.是否需要自动删除
         * 5.其它的参数
         */
        return new CustomExchange(PLUGIN_DELAYED_EXCHANGE, "x-delayed-message", true, false,
                arguments);
    }

    @Bean(PLUGIN_DELAYED_QUEUE)
    public Queue pluginDelayedQueue() {
        return QueueBuilder.durable(PLUGIN_DELAYED_QUEUE).build();
    }

    @Bean
    public Binding qaBindingX(@Qualifier(PLUGIN_DELAYED_EXCHANGE) CustomExchange de,
                              @Qualifier(PLUGIN_DELAYED_QUEUE) Queue dq) {
        return BindingBuilder.bind(dq).to(de).with(PLUGIN_DELAYED_ROUTING_KEY).noargs();
    }


}

# 生产者

注意调用的是:setDelay

    @GetMapping("/plugin/ttl/{delayTime}")
    public String sendMessageByPlugin(@PathVariable("delayTime") Integer delayTime) {
        System.out.println("基于插件的方式发送延时队列  接收到的时间是 " + delayTime);


        rabbitTemplate.convertAndSend(
                RabbitMQTTLConfig.PLUGIN_DELAYED_EXCHANGE,
                RabbitMQTTLConfig.PLUGIN_DELAYED_ROUTING_KEY,
                "插件的延时消息  " + (delayTime * 100),
                message -> {
                    message.getMessageProperties().setDelay(delayTime * 100);
                    return message;
                }
        );
        return "{msg: 发送成功,code:200}";
    }

# 消费者

@RabbitListener(queues = RabbitMQTTLConfig.PLUGIN_DELAYED_QUEUE)
public void listenerPluginDelayedQueue(Message message, Channel channel) {
    System.out.println("接收到的消息是 => " + new String(message.getBody(), StandardCharsets.UTF_8));
    System.out.println("当前时间是 => " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}