# direct 直接交换机

# 介绍

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。

Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

直接交换机

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃

# 多重绑定

多重绑定 当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

# 代码

# 生产者

import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * author:      高亮
 * time:        2022/7/6
 * description: 直接交换机,生产者
 */
@Slf4j
public class ExchangeDirect_Producer {
    public static final String EXCHANGE_DIRECT = "direct_exchange";
    public static final String ROUTING_KEY_ERROR= "error";
    public static final String ROUTING_KEY_INFO = "info";
    public static final String ROUTING_KEY_WARNING = "warning";
    public static final int MESSAGE_COUNT = 30;

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "张三"+i;
            channel.basicPublish(EXCHANGE_DIRECT, ROUTING_KEY_ERROR, null, message.getBytes(StandardCharsets.UTF_8));
        }
        log.info("发送完毕");
        channel.close();
        
    }
}

# 消费者

import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * author:      高亮
 * time:        2022/7/6
 * description: 直接交换机,消费者
 */
@Slf4j
public class ExchangeDirect_Consumer {
    public static final String QUEUE_CONSOLE = "console";
    public static final String ROUTING_KEY_WARNING = "warning";
    public static final String ROUTING_KEY_INFO = "info";
    public static final String EXCHANGE_DIRECT = "direct_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);

        channel.queueDeclare(QUEUE_CONSOLE, false, false, true, null);

        // 绑定info和warning路由
        channel.queueBind(QUEUE_CONSOLE, EXCHANGE_DIRECT, ROUTING_KEY_INFO);
        channel.queueBind(QUEUE_CONSOLE, EXCHANGE_DIRECT, ROUTING_KEY_WARNING);

        DeliverCallback deliverCallback = (consumerTag, message)->{
            log.info("接收到的消息: {}",new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = (consumerTag)->{
            log.info("取消的消息: {}",consumerTag);
        };

        channel.basicConsume(QUEUE_CONSOLE,true,deliverCallback,cancelCallback);
        log.info("等待接收消息");
    }
}