网站建设云/长沙市云网站建设
目录
一、概述
1、消息可靠性
2、SpringBoot整合RabbitMQ配置文件
二、生产者---RabbitMQ服务器如何保证信息不丢失
1、confirm确认模式
1.说明
2.SpringBoot代码实现
2、return退回模式
1.说明
2.SpringBoot代码实现
三、RabbitMQ服务器如何保证消息不丢失
四、RabbitMQ服务器---消费者如何保证消息不丢失
1、ACK
2、实现方式
3、SpringBoot代码实现
一、概述
1、消息可靠性
当生产者生产出一条消息发送给MQ是时,该消息来到MQ服务器会先到达交换机,然后由交换机根据路由分发给对应的队列,然后再由MQ服务器给消费者进行消费。这个步骤可分为三个过程分别是消息从消费者到交换机、消息从交换机到队列、消息从队列到消费者。那么RabbitMQ是如何保证消息的可靠性的呢。
在前两个过程,也就是消息从生产者到达服务器的过程,RabbitMQ提供了两种保证消息从生产者到MQ服务器消息可靠性的方法,分别是confirm确认模式与return退回模式。而在第三个过程,也就是消息从MQ服务器到消费者的过程,RabbitMQ提供了ACK模式,如果学习过计算机网络TCP协议就回明白,ACK(ackonwledge承认)当消费者收到MQ服务器的消息后,会给MQ服务器返回一个确认信息,服务器收到ack才会对该消息进行删除。
2、SpringBoot整合RabbitMQ配置文件
在了解这几个机制之前,先了解以下SpringBoot整合RabbitMQ的一些配置信息(yml形式)
spring:rabbitmq:host: 127.0.0.1 #RabbitMQ服务器IPport: 5672 #端口username: guest #用户名password: guest #密码virtual-host: /learn #虚拟机publisher-confirm-type: correlated #开启确认机制publisher-returns: true #开启回退模式listener:simple:acknowledge-mode: manual #开启收动签收prefetch: 4 #消费端每次拉去10条数据,直到确认消费完毕才拉去下10条retry:enabled: true #开启重试max-attempts: 4 #重试最大次数max-interval: 1000s #重试最大时长
二、生产者---RabbitMQ服务器如何保证信息不丢失
1、confirm确认模式
1.说明
当消息从producer生产者到达exchange交换机,会以异步地给消费者返回一个confirmCallbak回调,如果交换机收到了就返回true如果没有收到则返回false,如果返回false生产者收到该信息后可进行重发等处理
2.SpringBoot代码实现
首先我们需要创建一个RabbitMQ地配置类并注入Spring里,让他可以随着项目地启动而启动,然后我们在类里需要先创建队列以及交换机,此出我们使用topic模式进行演示【RabbitMQ】SpringBoot整合RabbitMQ、实现RabbitMQ五大工作模式(万字长文)_1373i的博客-CSDN博客https://blog.csdn.net/qq_61903414/article/details/130174313?spm=1001.2014.3001.5501
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置RabbitMQ* 配置交换机、队列、绑定队列与交换机*/
@Configuration
public class RabbitMQConfig {/*** topic通配符模式*/private static final String T_QUEUE1 = "tQueue1";private static final String T_QUEUE2 = "tQueue2";private static final String T_EXCHANGE = "tEx";@Beanpublic Queue tQueue1() {return QueueBuilder.durable(T_QUEUE1).build();}@Beanpublic Queue tQueue2() {return new Queue(T_QUEUE2);}@Beanpublic TopicExchange tEx() {return ExchangeBuilder.topicExchange(T_EXCHANGE).durable(true).build();}@Beanpublic Binding binding1(@Qualifier("tQueue1") Queue tQueue1,@Qualifier("tEx") TopicExchange tEx) {return BindingBuilder.bind(tQueue1).to(tEx).with("A.*");}@Beanpublic Binding binding2(@Qualifier("tQueue2") Queue tQueue2,@Qualifier("tEx") TopicExchange tEx) {return BindingBuilder.bind(tQueue2).to(tEx).with("#.error");}}
其次我们要在配置文件里开启RabbitMQ地确认模式
此时我们就可以编写生产者类通过这个方法abbitTemplate.setConfirmCallback实现回调方法
package com.example.demo.controller;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import javax.servlet.http.HttpServletRequest;@Controller
@ResponseBody
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/tEx")public void sendByT() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 配置信息* @param b 是否手动消息* @param s 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm被执行");if (!b) {System.out.println("接收失败" + s);// 处理,让消息重发}}});// 故意修改错交换机名称,运行查看回调方法的执行rabbitTemplate.convertAndSend("tExx","A.error","hello mq");}}
在发送消息时故意写错交换机名称,运行代码访问127.0.0.1:8080/tEx接口查看回调方法的执行
此时生产者代码打印了错误信息,此时我们可以通过代码对该消息进行重发处理
2、return退回模式
1.说明
当消息到达交换机以后就会根据路由key去到达匹配的队列里,如果消息在该过程没有到达queue。就会异步地返回一个returnCallback的回调,将错误信息告诉生产者,生产者可进行后续处理,当交换机消息由于路由问题没有到达队列时,此时交换机对消息的处理有两种方式,默认方式是直接丢弃,另一种是将消息返回给生产者,在后续代码实现时,我们要设置第二种方式
2.SpringBoot代码实现
首先要在配置文件里开启回退模式
然后在生产者类代码里这次我们要通过rabbitTemplate.setReturnCallback实现回调方法
package com.example.demo.controller;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import javax.servlet.http.HttpServletRequest;@Controller
@ResponseBody
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/tEx")public void sendByT() {rabbitTemplate.setMandatory(true); // 设为true消息由交换机给queue失败时返回给发送者rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback(){/**** @param message 回退的信息* @param replyCode 错误码* @param replyText 错误信息* @param exchange 交换机* @param routingKey 路由*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("执行了回退方法");}});// 故意写错路由让消息到交换机后无法到达队列rabbitTemplate.convertAndSend("tEx","lolo","hello mq");}}
此时运行代码,访问接口查看回调方法的执行
三、RabbitMQ服务器如何保证消息不丢失
在MQ服务器里交换机、队列等都是默认持久化到硬盘里的,消息到达MQ服务器是默认存储在硬盘里的。所以数据不会因为MQ服务器宕机重启而导致丢失。
四、RabbitMQ服务器---消费者如何保证消息不丢失
1、ACK
消费者收到消息后会给MQ服务器返回收到消息的确认信息,而确认在MQ里确认信息有三种,粉分别是:自动确认(默认)手动确认(manual)根据异常进行确认(auto)第三种比较麻烦。其中自动确认是指当消息一旦被消费者接收,就自动确认收到,MQ服务器就会对该消息进行删除。但是在业务代码里,消息收到后,业务可能会出现异常导致消息没有被真正的消费,那么消息就丢失了,此时就有了手动确认的方法,手动设置需要在业务执行完成后调用channel.basicAck()方法手动签收,而如果执行过程中代码出现了异常也可以使用channel.basicNack()方法进行拒收,让MQ服务器自动重发消息
2、实现方式
在上述介绍了三种实现方式,默认是自动确认但存在缺陷,下面我们用代码实现手动确认
3、SpringBoot代码实现
首先要做配置文件里添加开启自动确认的属性
然后创建一个消费者类并注入Spring中
此时编写消费者代码,手动确认与拒绝确认API在下面代码注释中有讲解
package com.example.demo.controller;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class DemoQueueListener {/*** 手动签收* @param message* @param channel* @throws IOException*/@RabbitListener(queues = "fQueue1")public void listener(Message message, Channel channel) throws IOException {long tag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息" + new String(message.getBody())); // 处理消息/*** 参数1 :收到消息的标签* 参数2 :false--签收所有的消息*/int i = 10 / 0;// 故意不手动接收 抛出异常 看MQ是否重发// channel.basicAck(tag, true); // 确认签收} catch (Exception e) {/*** 参数3:true--消息重回队列,会重发该消息 false---不回*/channel.basicNack(tag,true,true); // 拒绝签收}}
}
此时运行代码查看消息是否被消费,该队列只有一条消息,看是否会被消费
此时消息不但没有被消费,还被持续重发进行重复消费