# topic 主题交换机

# 之前问题

我们没有使用只能进行随意广播的 fanout 交换机,而是 使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型

# Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。

这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。

  • 在这个规则列表中,其中有两个替换符是大家需要注意的
    • *(星号)可以代替一个单词
    • #(井号)可以替代零个或多个单

# Topic 匹配案例

  • Q1-->绑定的是 中间带 orange 带 3 个单词的字符串(.orange.)
  • Q2-->绑定的是 最后一个单词是 rabbit 的 3 个单词(..rabbit) 第一个单词是 lazy 的多个单词(lazy.#)

主题匹配案例

  • quick.orange.rabbit 被队列 Q1Q2 接收到
  • lazy.orange.elephant 被队列 Q1Q2 接收到
  • quick.orange.fox 被队列 Q1 接收到
  • lazy.brown.fox 被队列 Q2 接收到
  • azy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
  • quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
  • quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
  • lazy.orange.male.rabbit 是四个单词但匹配 Q2

# 代码

# 生产者

import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * author:      高亮
 * time:        2022/7/6
 * description: 扇出交换机,生产者
 */
@Slf4j
public class ExchangeTipic_Producer {
    private static final String EXCHANGE_TOPIC = "topic_logs";
    private static final String ROUTING_KEY_QUICK = "quick.orange.rabbit";
    private static final String ROUTING_KEY_LAZY = "lazy.orange.elephant";
    private static final String ROUTING_KEY_FOX = "quick.orange.fox";
    public static final int MESSAGE_COUNT = 30;

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "张三"+i;
            channel.basicPublish(EXCHANGE_TOPIC,ROUTING_KEY_LAZY,null,message.getBytes(StandardCharsets.UTF_8));
        }
        log.info("发送完毕");
    }
}

# 消费者

import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * author:      高亮
 * time:        2022/7/6
 * description: 主题交换机,消费者
 */

@Slf4j
public class Exchange_Topic_Consumer {
    private static final String QUEUE_Q1 = "Q1";
    private static final String ROUTING_KEY_ORANGE = "*.orange.*";
    private static final String EXCHANGE_TOPIC = "topic_logs";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(QUEUE_Q1,false,false,true,null);
        channel.queueBind(QUEUE_Q1,EXCHANGE_TOPIC,ROUTING_KEY_ORANGE);
        DeliverCallback deliverCallback = (consumerTag, message)->{
            log.info("接收到的消息: {}",new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = (consumerTag)->{
            log.info("取消的消息: {}",consumerTag);
        };

        channel.basicConsume(QUEUE_Q1,true,deliverCallback,cancelCallback);
        log.info("1_等待接收消息");
    }
}