0%

RabbitMQ事务,confirm机制以及return机制


本文是参考下面这篇博客然后进行复现浓缩后的总结
https://www.cnblogs.com/vipstone/p/9350075.html

正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题?RabbitMQ有两种方式来解决这个问题:

  • 通过AMQP提供的事务机制实现;
  • 使用发送者确认模式实现;

事务使用

事物的实现主要是对于信道(Channel)的设置,其中主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;
  2. channel.txComment()提交事务;
  3. channel.txRollback()回滚事务;

尝试代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 队列名, 持久化, 是否排外, 非自动删除
channel.queueDeclare("queueName",true, false, false, null);
String message = "这是一个测试消息";
try {
channel.txSelect();
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
}catch (Exception e){
channel.txRollback();
}finally {
channel.close();
connection.close();
}
}

客户端与rabbitMQ的交互流程如下:

  1. 客户端发送给服务器Tx.Select(开启事务模式)
  2. 服务器端返回Tx.Select-Ok(开启事务模式ok)
  3. 推送消息
  4. 客户端发送给事务提交Tx.Commit
  5. 服务器端返回Tx.Commit-Ok

消费者模式使用事务

假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,那么RabbitMQ会产生什么样的变化?结果分为两种情况:

  • autoAck=false手动确认的时候是支持事务的,也就是说即使你已经手动确认了消息,但客户端也会在确认事务返回消息之后,再做决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为主,此条消息会重新放回队列;
  • autoAck=true如果确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

二、Confirm发送方确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;

1.普通发送方确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 队列名, 持久化, 是否排外, 非自动删除
channel.queueDeclare("queueName",true, false, false, null);
String message = "这是一个测试消息";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
if(channel.waitForConfirms()){
System.out.println("消息发送成功");
}
}

2.批量确认模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 队列名, 持久化, 是否排外, 非自动删除
channel.queueDeclare("queueName",true, false, false, null);
channel.confirmSelect();
for(int i = 0; i < 10; i++){
String message = "这是一个测试消息" + i;
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
channel.waitForConfirmsOrDie();//对所有消息进行等待,只要有一个未发送就会返回异常
}

3.异步监听发送方确认模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void test2() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 队列名, 持久化, 是否排外, 非自动删除
channel.queueDeclare("queueName",true, false, false, null);
channel.confirmSelect();
for(int i = 0; i < 10; i++){
String message = "这是一个测试消息" + i;
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息发送失败,标识:" + l);
}

@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println(String.format("消息发送成功,标识:%d, 是否是多个同时确认:%b", l, b));
}
});
}

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。


二、Return消息机制

Return Listener用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!

在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

使用方式是在channel中添加一个ReturnListener

1
2
3
4
5
6
7
channel.addReturnListener (new ReturnListener () {
@Override
public void handleReturn (int replyCode, string replyText, String exchange, String routingKey,
AMQP.BasicProperties properties,byte[] body) throws IOException {
// 这里是业务代码
}
});

总结:

Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。