# 消息应答
# 概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。
RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
# 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压
最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。
# 手动应答
消息应答的方法
- A.Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
- B.Channel.basicNack(用于否定确认)
- C.Channel.basicReject(用于否定确认)与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了
手动应答的好处是可以批量应答并且减少网络拥堵
multiple 的 true 和 false 代表不同意思
rue 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
# 手动应答代码
# 生产者
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* author: 高亮
* time: 2022/7/5
* description: 消息应答不丢失,生产者
*/
@Slf4j
public class Producer_ACK {
private static final String ACK_QUEUE = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(ACK_QUEUE, false, false, true, null);
for (int i = 0; i < 20; i++) {
byte[] message = ("ack message" + i).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", ACK_QUEUE, null, message);
}
log.info("发送完毕");
}
}
# 消费者
消费者有两个一个模拟只需要一秒就消费完,另一个要30秒,代码就是sleep 睡三十秒
import com.gao.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* author: 高亮
* time: 2022/7/5
* description: 消息应答,消费者
*/
@Slf4j
public class Consumer_ACK {
private static final String ACK_QUEUE = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message)->{
long tag = message.getEnvelope().getDeliveryTag();
log.info("接收到的消息: {}",new String(message.getBody(), StandardCharsets.UTF_8));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 1.消息的标记tag
* 2.是否批量应答false:不批量应答信道中的消息 true:批量
*/
channel.basicAck(tag,false); //tag标记,false不批量
};
/**
* 消息被取消回调
*/
CancelCallback cancelCallback = (consumerTag)->{
};
//false 表示手动应答,true 自动应答
final boolean AUTO_ACK = false;
channel.basicConsume(ACK_QUEUE, AUTO_ACK, deliverCallback, cancelCallback);
log.info("消息接收完毕");
}
}
# 重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。
如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。
← Work Queues 持久化 →