# topic 主题交换机
# 之前问题
我们没有使用只能进行随意广播的 fanout 交换机,而是 使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型
# Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。
这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
- 在这个规则列表中,其中有两个替换符是大家需要注意的
- *(星号)可以代替一个单词
- #(井号)可以替代零个或多个单
# Topic 匹配案例
- Q1-->绑定的是 中间带 orange 带 3 个单词的字符串(.orange.)
- Q2-->绑定的是 最后一个单词是 rabbit 的 3 个单词(..rabbit) 第一个单词是 lazy 的多个单词(lazy.#)
- quick.orange.rabbit 被队列 Q1Q2 接收到
- lazy.orange.elephant 被队列 Q1Q2 接收到
- quick.orange.fox 被队列 Q1 接收到
- lazy.brown.fox 被队列 Q2 接收到
- azy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
- quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
- quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
- lazy.orange.male.rabbit 是四个单词但匹配 Q2
# 代码
# 生产者
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* author: 高亮
* time: 2022/7/6
* description: 扇出交换机,生产者
*/
@Slf4j
public class ExchangeTipic_Producer {
private static final String EXCHANGE_TOPIC = "topic_logs";
private static final String ROUTING_KEY_QUICK = "quick.orange.rabbit";
private static final String ROUTING_KEY_LAZY = "lazy.orange.elephant";
private static final String ROUTING_KEY_FOX = "quick.orange.fox";
public static final int MESSAGE_COUNT = 30;
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "张三"+i;
channel.basicPublish(EXCHANGE_TOPIC,ROUTING_KEY_LAZY,null,message.getBytes(StandardCharsets.UTF_8));
}
log.info("发送完毕");
}
}
# 消费者
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* author: 高亮
* time: 2022/7/6
* description: 主题交换机,消费者
*/
@Slf4j
public class Exchange_Topic_Consumer {
private static final String QUEUE_Q1 = "Q1";
private static final String ROUTING_KEY_ORANGE = "*.orange.*";
private static final String EXCHANGE_TOPIC = "topic_logs";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_Q1,false,false,true,null);
channel.queueBind(QUEUE_Q1,EXCHANGE_TOPIC,ROUTING_KEY_ORANGE);
DeliverCallback deliverCallback = (consumerTag, message)->{
log.info("接收到的消息: {}",new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = (consumerTag)->{
log.info("取消的消息: {}",consumerTag);
};
channel.basicConsume(QUEUE_Q1,true,deliverCallback,cancelCallback);
log.info("1_等待接收消息");
}
}
← direct 直接交换机 死信队列 →