# 延时队列
# 代码架构图
# 解释
# 交换机
- x 普通交换机
- y 死信交换机
# 队列
- QA 普通
- QB 普通
- OD 死信
# 路由
- XA 路由到 QA 队列
- XB 路由到 QB 队列
- YD 路由到 Y交换机在到 QD 死信队列
# 代码
# 配置类(路由,交换机,路由)
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQTTLConfig {
public static final String X_NORMAL_DIRECT_EXCHANGE = "x";
public static final String Y_DEAD_DIRECT_EXCHANGE = "y";
public static final String QA_NORMAL_QUEUE = "qa";
public static final String QD_DEAD_QUEUE = "qd";
public static final String XA_NORMAL_ROUTING_KEY = "xa";
public static final String YD_DAD_ROUTING_KEY = "yd";
@Bean(X_NORMAL_DIRECT_EXCHANGE)
public DirectExchange X() {
return new DirectExchange(X_NORMAL_DIRECT_EXCHANGE);
}
@Bean(Y_DEAD_DIRECT_EXCHANGE)
public DirectExchange Y() {
return new DirectExchange(Y_DEAD_DIRECT_EXCHANGE);
}
@Bean(QA_NORMAL_QUEUE)
public Queue qa() {
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_DIRECT_EXCHANGE);
//路由
arguments.put("x-dead-letter-routing-key", YD_DAD_ROUTING_KEY);
//延时时间 ttl
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QA_NORMAL_QUEUE).withArguments(arguments).build();
}
@Bean(QD_DEAD_QUEUE)
public Queue qd() {
return QueueBuilder.durable(QD_DEAD_QUEUE).build();
}
@Bean
public Binding qaBindingX(@Qualifier(QA_NORMAL_QUEUE) Queue qa,
@Qualifier(X_NORMAL_DIRECT_EXCHANGE) DirectExchange x) {
return BindingBuilder.bind(qa).to(x).with(XA_NORMAL_ROUTING_KEY);
}
@Bean
public Binding qdBindingY(@Qualifier(QD_DEAD_QUEUE) Queue qd,
@Qualifier(Y_DEAD_DIRECT_EXCHANGE) DirectExchange y) {
return BindingBuilder.bind(qd).to(y).with(YD_DAD_ROUTING_KEY);
}
}
# 生产者
import com.example.mq_s.config.RabbitMQTTLConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
@RequestMapping("/p")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping()
public String sendMessage(String msg) {
System.out.println("发送延时10秒消息到MQ");
rabbitTemplate.convertAndSend(
RabbitMQTTLConfig.X_NORMAL_DIRECT_EXCHANGE,
RabbitMQTTLConfig.XA_NORMAL_ROUTING_KEY,
msg
);
System.out.println("发送成功当前时间 = > " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
return "{msg: 发送成功,code:200}";
}
}
# 消费者
import com.example.mq_s.config.RabbitMQTTLConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQTTLConfig.QD_DEAD_QUEUE)
public void listener(Message message, Channel channel) {
System.out.println("监听到10秒延时消息");
System.out.println("接收到的消息是 => " + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("当前时间是 => " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
# 问题
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
解决办法就是新增一个队列,不设置时间,由生产者发送消息时指定时间
警告
下面的代码虽然是生产者自己设置过期时间,但是有一个致命的问题就是,他是基于队列的
也就是先进先出,当第一个消息是10秒,第二个消息是2秒,那么就会先接收到10秒的,再是2秒的
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
# 代码
基于上面的改进
# 配置类,跟上面差不多,增加了一个不加上TTL的队列
同样的去除掉了,只显示新增加的
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQTTLConfig {
public static final String X_NORMAL_DIRECT_EXCHANGE = "x";
public static final String QC_NORMAL_QUEUE = "qc";
public static final String XC_NORMAL_ROUTING_KEY = "xc";
@Bean(X_NORMAL_DIRECT_EXCHANGE)
public DirectExchange X() {
return new DirectExchange(X_NORMAL_DIRECT_EXCHANGE);
}
@Bean(QC_NORMAL_QUEUE)
public Queue qc() {
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_DIRECT_EXCHANGE);
//路由
arguments.put("x-dead-letter-routing-key", YD_DAD_ROUTING_KEY);
return QueueBuilder.durable(QC_NORMAL_QUEUE).withArguments(arguments).build();
}
@Bean
public Binding qcBindingX(@Qualifier(QC_NORMAL_QUEUE) Queue qc,
@Qualifier(X_NORMAL_DIRECT_EXCHANGE) DirectExchange x) {
return BindingBuilder.bind(qc).to(x).with(XC_NORMAL_ROUTING_KEY);
}
}
# 生产者(自己指定时间)
@GetMapping("custom/ttl")
public String sendMessageCustomTTL(String msg) {
System.out.println("发送延时10秒消息到MQ");
rabbitTemplate.convertAndSend(
RabbitMQTTLConfig.X_NORMAL_DIRECT_EXCHANGE,
RabbitMQTTLConfig.XC_NORMAL_ROUTING_KEY,
msg,
message -> {
//十五秒
message.getMessageProperties().setExpiration("15000");
return message;
}
);
System.out.println("发送成功当前时间 = > " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
return "{msg: 发送成功,code:200}";
}
# 消费者还是跟上面一样
# 插件实现延时队列
下载插件(放在阿里云盘上了)
官网地址 (opens new window)
下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
目录地址:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 进入可视化界面 exchange ,如果安装成功就会显示下面的
# 代码架构图
# 代码
# 配置
注意
因为是基于插件的,所以交换机名称,路由,队列必须按照这个名字
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQTTLConfig {
// 基于插件的交换机,必须这个名字
public static final String PLUGIN_DELAYED_EXCHANGE = "delayed.exchange";
//队列
public static final String PLUGIN_DELAYED_QUEUE = "delayed.queue";
//路由
public static final String PLUGIN_DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean(PLUGIN_DELAYED_EXCHANGE)
public CustomExchange pluginDelayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
/**
* 1.交换机的名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其它的参数
*/
return new CustomExchange(PLUGIN_DELAYED_EXCHANGE, "x-delayed-message", true, false,
arguments);
}
@Bean(PLUGIN_DELAYED_QUEUE)
public Queue pluginDelayedQueue() {
return QueueBuilder.durable(PLUGIN_DELAYED_QUEUE).build();
}
@Bean
public Binding qaBindingX(@Qualifier(PLUGIN_DELAYED_EXCHANGE) CustomExchange de,
@Qualifier(PLUGIN_DELAYED_QUEUE) Queue dq) {
return BindingBuilder.bind(dq).to(de).with(PLUGIN_DELAYED_ROUTING_KEY).noargs();
}
}
# 生产者
注意调用的是:setDelay
@GetMapping("/plugin/ttl/{delayTime}")
public String sendMessageByPlugin(@PathVariable("delayTime") Integer delayTime) {
System.out.println("基于插件的方式发送延时队列 接收到的时间是 " + delayTime);
rabbitTemplate.convertAndSend(
RabbitMQTTLConfig.PLUGIN_DELAYED_EXCHANGE,
RabbitMQTTLConfig.PLUGIN_DELAYED_ROUTING_KEY,
"插件的延时消息 " + (delayTime * 100),
message -> {
message.getMessageProperties().setDelay(delayTime * 100);
return message;
}
);
return "{msg: 发送成功,code:200}";
}
# 消费者
@RabbitListener(queues = RabbitMQTTLConfig.PLUGIN_DELAYED_QUEUE)
public void listenerPluginDelayedQueue(Message message, Channel channel) {
System.out.println("接收到的消息是 => " + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("当前时间是 => " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}