# direct 直接交换机
# 介绍
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。
Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
# 多重绑定
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。
# 代码
# 生产者
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* author: 高亮
* time: 2022/7/6
* description: 直接交换机,生产者
*/
@Slf4j
public class ExchangeDirect_Producer {
public static final String EXCHANGE_DIRECT = "direct_exchange";
public static final String ROUTING_KEY_ERROR= "error";
public static final String ROUTING_KEY_INFO = "info";
public static final String ROUTING_KEY_WARNING = "warning";
public static final int MESSAGE_COUNT = 30;
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "张三"+i;
channel.basicPublish(EXCHANGE_DIRECT, ROUTING_KEY_ERROR, null, message.getBytes(StandardCharsets.UTF_8));
}
log.info("发送完毕");
channel.close();
}
}
# 消费者
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* author: 高亮
* time: 2022/7/6
* description: 直接交换机,消费者
*/
@Slf4j
public class ExchangeDirect_Consumer {
public static final String QUEUE_CONSOLE = "console";
public static final String ROUTING_KEY_WARNING = "warning";
public static final String ROUTING_KEY_INFO = "info";
public static final String EXCHANGE_DIRECT = "direct_exchange";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_CONSOLE, false, false, true, null);
// 绑定info和warning路由
channel.queueBind(QUEUE_CONSOLE, EXCHANGE_DIRECT, ROUTING_KEY_INFO);
channel.queueBind(QUEUE_CONSOLE, EXCHANGE_DIRECT, ROUTING_KEY_WARNING);
DeliverCallback deliverCallback = (consumerTag, message)->{
log.info("接收到的消息: {}",new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = (consumerTag)->{
log.info("取消的消息: {}",consumerTag);
};
channel.basicConsume(QUEUE_CONSOLE,true,deliverCallback,cancelCallback);
log.info("等待接收消息");
}
}