本文是参考下面这篇博客然后进行复现浓缩后的总结
https://www.cnblogs.com/vipstone/p/9350075.html
正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题?RabbitMQ有两种方式来解决这个问题:
- 通过AMQP提供的事务机制实现;
- 使用发送者确认模式实现;
事务使用
事物的实现主要是对于信道(Channel)的设置,其中主要的方法有三个:
- channel.txSelect()声明启动事务模式;
- channel.txComment()提交事务;
- channel.txRollback()回滚事务;
尝试代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queueName",true, false, false, null); String message = "这是一个测试消息"; try { channel.txSelect(); channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); channel.txCommit(); }catch (Exception e){ channel.txRollback(); }finally { channel.close(); connection.close(); } }
|
客户端与rabbitMQ的交互流程如下:
- 客户端发送给服务器Tx.Select(开启事务模式)
- 服务器端返回Tx.Select-Ok(开启事务模式ok)
- 推送消息
- 客户端发送给事务提交Tx.Commit
- 服务器端返回Tx.Commit-Ok
消费者模式使用事务
假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,那么RabbitMQ会产生什么样的变化?结果分为两种情况:
- autoAck=false手动确认的时候是支持事务的,也就是说即使你已经手动确认了消息,但客户端也会在确认事务返回消息之后,再做决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为主,此条消息会重新放回队列;
- autoAck=true如果确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;
二、Confirm发送方确认模式
Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
1.普通发送方确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queueName",true, false, false, null); String message = "这是一个测试消息"; channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); if(channel.waitForConfirms()){ System.out.println("消息发送成功"); } }
|
2.批量确认模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queueName",true, false, false, null); channel.confirmSelect(); for(int i = 0; i < 10; i++){ String message = "这是一个测试消息" + i; channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.waitForConfirmsOrDie(); }
|
3.异步监听发送方确认模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public void test2() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queueName",true, false, false, null); channel.confirmSelect(); for(int i = 0; i < 10; i++){ String message = "这是一个测试消息" + i; channel.basicPublish("exchangeName", "queueName", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("消息发送失败,标识:" + l); }
@Override public void handleNack(long l, boolean b) throws IOException { System.out.println(String.format("消息发送成功,标识:%d, 是否是多个同时确认:%b", l, b)); } }); }
|
可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。
二、Return消息机制
Return Listener用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
使用方式是在channel中添加一个ReturnListener
1 2 3 4 5 6 7
| channel.addReturnListener (new ReturnListener () { @Override public void handleReturn (int replyCode, string replyText, String exchange, String routingKey, AMQP.BasicProperties properties,byte[] body) throws IOException { } });
|
总结:
Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。