RabbitMQ 全面整合
RabbitMQ
一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列, FIFO 先入先出,里面存放的内容是message. 支持持久化
它是一个消息中间件:它接收消息并且转发,就类似于一个快递站,卖家把快递通过快递站,送到我们的手上,MQ也是这样,接收并存储消息,再转发
这里提供一张各类中间件的结构说明
应用场景
- 1:跨系统数据传递
就比较好理解,当A, B两个不同的系统需要不定时的进行数据传递,作为消息中间件自然是能够满足跨系统的数据传递,A系统放入数据,B系统收到提示或定时的去取即可
- 2:高并发的流量削峰
就是在访问量剧增的情况下,比如“双十一”下单,但是淘宝这个应用仍然要运行,所以就可以使用消息中间件采用队列的形式减少突然访问的压力
- 3:数据的分发和异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。
这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息
- 4:大数据分析与传递
- 5:分布式事务
和redis中list类似,利用队列结构的特性实现分布式事务,进来一个,待执行完后则消费掉,在此之前请求会一直堵塞,消费后在进行下一个请求
详情介绍
操作界面
服务器上先安装好Erlang(因为Erlang语言开发的,所以需要安装环境)和RabbitMQ,具体百度,安装好后会看到一个管理界面, 首页Overview的概览情况,里面包括集群各个节点的信息、端口映射信息;而其它的分别对应是(详情介绍看传送门2):
- Connection(连接)
- Channel(信道)
- Exchange(交换机)
- Queue(队列)
- Admin (用户)
工作原理
RabbitMQ的工作模型如下
其中,中间的Broker表示RabbitMQ服务,每个Broker里面至少有一个Virtual host虚拟主机,每个虚拟主机中有自己的Exchange交换机、Queue队列以及Exchange交换机与Queue队列之间的绑定关系Binding。producer(生产者)和consumer(消费者)通过与Broker建立Connection来保持连接,然后在Connection的基础上建立若干Channel信道,用来发送与接收消息
- Connection(连接)
使用mq首先就要与RabbitMQ建立连接,这个连接就是Connection。Connection是一个TCP长连接
- Channel(信道)
Channel是在Connection的基础上建立的虚拟连接,RabbitMQ中大部分的操作都是使用Channel完成的,比如:声明Queue、声明Exchange、发布消息、消费消息等
(为什么不Connection直接操作而要加上Channel,是为了支持多线程操作,每个线程创建单独的Channel进行通讯,如果没有Channel,那么每个线程在访问MQ时都要建立一个Connection这样的TCP连接.Connection与Channel之间的关系可以比作光纤电缆,如果把Connection比作一条光纤电缆,那么Channel就相当于是电缆中的一束光纤)
- Virtual host(虚拟主机)
一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名
- Exchange(交换机)
比较重要的概念,它是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。Exchange的4种分发消息的规则类型:direct(直连交换机)、fanout(扇形交换机)、topic(主题交换机)、headers(首部交换机)
在之前还需要清楚Routing key(路由键), 当我们创建好Exchange和Queue之后,需要使用Binding key将它们绑定起来,producer在向Exchange发送一条消息的时候,必须指定一个Routing key,然后Exchange接收到这条消息之后,会解析Routing key,然后根据Exchange和Queue的绑定规则,将消息分发到符合规则的Queue中
注意一下Routing key和Binding key并不是一个意思 :
- Routing key(路由键):当消息生产者发布消息到交换器(Exchange)时,会指定一个Routing key。RabbitMQ根据这个Routing key将消息路由到对应的队列(Queue)。路由键是消息的一个属性,它决定了消息如何从交换器路由到队列。
- Binding key(绑定键):当队列(Queue)和交换器(Exchange)之间建立绑定关系时,可以指定一个Binding key。绑定键定义了队列对哪些Routing key感兴趣。当消息的Routing key与队列的Binding key匹配时,消息就会被投递到这个队列。
简单来说,Routing key是发送消息时用到的,而Binding key是建立交换器和队列关系时用到的。在大多数情况下,Binding key和Routing key可能是相同的,但这不是必须的,它们可以不同,这取决于具体的绑定和路由规则
1. direct(直连交换机)
direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配, 也就是只有当producer发送的消息的Routing key与某个Binding key相等时,消息才会被分发到对应的Queue上
比如我们现在有一个direct类型的Exchange,它下面绑定了三个Queue,Binding key分别是ORDER/GOODS/STOCK:
然后我们向该Exchange中发送一条消息,消息的Routing key是ORDER:
按照规则分析,这条消息应该被路由到MY_EXCHANGE_ORDER_QUEUE这个Queue。消息发送成功之后,我们去Queues中查看,发现确实只有MY_EXCHANGE_ORDER_QUEUE这个QUEUE接收到了一条消息
进入这个队列,通过getMessage取出消息查看,确实是我们刚才手动发送的那条消息
2. fanout(扇形交换机)
fanout类型的Exchange不处理Routing key(自动忽略),而是会将发送给它的消息路由到所有与它绑定的Queue上
比如我们现在有一个fanout类型的Exchange,它下面绑定了三个Queue,Binding key分别是ORDER/GOODS/STOCK:
然后我们向该Exchange中发送一条消息,消息的Routing key随便填一个值abc:
按照规则分析,这条消息应该被路由到所有与该Exchange绑定的Queue,即三个Queue都应该会受到消息。消息发送成功之后,我们去Queues中查看,发现确实每个QUEUE都接收到了一条消息
fanout类型的Exchange不管Routing key是什么,它都会将接收到的消息分发给所有与自己绑定了的Queue上
3. topic(主题交换机)
topic类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上。通配符的匹配规则如下:
(1) Routing key必须是一串字符串,每个单词用“.”分隔
(2) 符号“#”表示匹配一个或多个单词
(3) 符号“*”表示匹配一个单词
其实很好理解,和直连模式一样,不同的就是主题模式可以更具Routing key进行模糊匹配,而不像直连那样的精确匹配, 例如:“*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”
4. headers(首部交换机,了解即可)
headers Exchange中,Exchange与Queue之间的绑定不再通过Binding key绑定,而是通过Arguments绑定。比如我们现在有一个headers类型的Exchange,下面通过不同的Arguments绑定了三个Queue:
producer在发送消息时可以添加headers属性,Exchange接收到消息后,会解析headers属性,只要我们上面配置的Arguments中的所有属性全部被包含在Headers中并且值相等,那么这条消息就会被路由到对应的Queue中
比如我们向上面的Exchange中发送一条消息,消息的Headers中添加“x=1”:
根据规则,只有queue1这个队列满足x=1的条件,queue2中的y=2条件不满足,所以,消息应该只被路由到queue1队列中
如果我们再发送一条消息,消息的headers中有两个属性:x=1,y=2:
根据规则,queue1的x=1的条件满足,queue2的x=1、y=2的条件满足,queue3的y=2的条件满足,所以,这三个Queue应该都能够收到这条消息
- Queue(队列)
Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息
消息分发机制: 一个生产者对应多个消费者,因此也多了两个分发模式可选择:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
开始简单的使用
从springboot中使用rabbitmq开始,这里用direct直连模式作为参考, 如果在界面中已经把queueu 和 exchange的关系先绑定话,代码中就不需要在编写绑定配置了,直接方法中直接使用即可
消息生产者
1. 导入依赖
https://mvnrepository.com/artifact/com.rabbitmq/amqp-client (自行选择需要的版本)
2. 配置yml文件
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 172.0.0.1
port: 5672
3. 配置config,声明交换机,队列,并进行绑定
@Configuration
public class DirectRabbitMqConfigration {
//1.声明注册direct模式的交换机
@Bean
public DirectExchange directExchange(){
//这里更具返回对象定义什么模式的交换机
return new DirectExchange("direct_exchange_order",true,false);
}
//2.声明队列
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
@Bean
public Queue directsmsQueue(){
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue directduanxinQueue(){
return new Queue("duanxin.direct.queue",true);
}
@Bean
public Queue directemailQueue(){
return new Queue("email.direct.queue",true);
}
@Bean
public Queue directttlmessageQueue(){
return new Queue("email.directttlmessage.queue",true);
}
//3.完成绑定关系
@Bean
public Binding directsmsBinding(){
return BindingBuilder.bind(directsmsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding directduanxinBinding(){
return BindingBuilder.bind(directduanxinQueue()).to(directExchange()).with("duanxin");
}
@Bean
public Binding directemailBinding(){
return BindingBuilder.bind(directemailQueue()).to(directExchange()).with("email");
}
}
4. 测试
@RequestMapping("rabbitmq")
@Controller
public class TestController {
@Resource
OrderService orderService;
@RequestMapping("/test1")
public void contextLoads(){
orderService.makeOrder("2","1",12);
}
}
@Service
public class OrderService implements RabbitTemplate.ConfirmCallback{
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
}
//消息确认回调函数
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// System.err.println(correlationData.getId());
System.err.println(ack);
System.err.println(cause);
if(ack){
System.out.println("消息确认成功!!!!");
}else{
System.out.println("消息确认失败!!!!");
}
}
public void makeOrderDirect(String userId, String prodId, int num) {
//1.根据商品id查询库存是否充足
//2.保存订单
String orderID = UUID.randomUUID().toString();
System.err.println("订单生成成功:" + orderID);
//3.通过mq来完成消息的分发
String exchangeName = "direct_exchange_order";
String routingKey = "sms";//订阅模式没有路由key
String routingKey2 = "email";//订阅模式没有路由key
//参数:1.交换机,2.路由key,3.消息内容
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID);
rabbitTemplate.convertAndSend(exchangeName, routingKey2, orderID);
}
}
消息消费者
这里可以使用两种方式,第一种就和生产者一样,新建一个配置类进行统一绑定管理,第二种则是类上注解中直接使用绑定使用即可
两种方式区别,第一种好处方便集中管理配置,第二种好处就是灵活,直观理解性更强
> 第一种
@Configuration
public class RabbitMqConfiguration {
@Bean
public DirectExchange directExchange() {
// 等价于 channel.exchangeDeclare(exchangeName,exchangeType,true);
return new DirectExchange("direct_order_ex", true, false);
}
@Bean
public Queue duanxinQueue() {
// 等价于 channel.exchangeDeclare(exchangeName,exchangeType,true);
return new Queue("duanxin.direct.queue");
}
@Bean
public Binding bindingDirect11() {
return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("");
}
@Bean
public Queue emailQueue() {
// 等价于 channel.exchangeDeclare(exchangeName,exchangeType,true);
return new Queue("email.direct.queue");
}
@Bean
public Queue smsQueue() {
// 等价于 channel.exchangeDeclare(exchangeName,exchangeType,true);
return new Queue("sms.direct.queue");
}
@Bean
public Queue weixinQueue() {
// 等价于 channel.exchangeDeclare(exchangeName,exchangeType,true);
return new Queue("weixin.direct.queue");
}
@Bean
public Binding bindingDirect14() {
return BindingBuilder.bind(weixinQueue()).to(directExchange()).with("weixin");
}
@Bean
public Binding bindingDirect12() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
}
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class FanOutSMSConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.err.println("fanout-sms--->接收到的消息是:"+message);
}
}
> 第二种
参考文章:[传送门]
// bindings其实就是用来确定队列和交换机绑定关系
1, value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化,autoDelete 表示没有消费者之后队列是否自动删除
2, exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
3, key: 在 topic 方式下,这个就是我们熟知的 routingKey
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:direct
type = ExchangeTypes.DIRECT),key = "sms"
))
@Component
public class EmailService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
}
}
功能机制介绍
持久化
持久化就把信息写入到磁盘的过程
上面在设置中也有持久化的参数说明,durable参数来实现,Durable为true时,队列才会持久化
过期时间
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息
第一种:
在配置绑定交换机队列时就设置
@Configuration
public class TTLRabbitMqConfigration {
//1.声明注册fanout模式的交换机
@Bean
public DirectExchange ttlExchange() {
return new DirectExchange("ttl_exchange_order", true, false);
}
//2.声明队列
@Bean
public Queue ttlQueue() {//设置过期时间
Map<String,Object> map = new HashMap();
map.put("x-message-ttl", 5000);
//设置队列长度(容量被限制为只能存储5条消息),超过的队列则会采取以下措施,这里是采用的第三种:
//1.阻塞生产者:默认情况下,如果队列已满,RabbitMQ将阻塞生产者,不再接受新的消息,直到队列中有足够的空间来接收新的消息
//2.丢弃消息:如果配置了x-max-length的同时还设置了x-overflow参数为drop-head或drop-tail,那么当队列满时,RabbitMQ会根据配置的策略丢弃队列中的消息。drop-head表示丢弃队列中最旧的消息,而drop-tail表示丢弃队列中最新的消息
//3.转发到死信交换机:如果队列绑定了死信交换机(Dead Letter Exchange, DLX),并且设置了x-dead-letter-exchange和x-dead-letter-routing-key参数,那么当队列满时,超出长度的消息将被转发到死信交换机
map.put("x-max-length", 5);
//以下是绑定死信队列,因为是路由模式,所以需要绑定路由key,如果是订阅模式则不需要
#map.put("x-dead-letter-exchange","Dead_exchange_order");
#map.put("x-dead-letter-routing-key","Dead");
return new Queue("ttl.fanout.queue", true,false,false,map);
}
//3.完成绑定关系
@Bean
public Binding ttlBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
第二种:
public void makeOrderDirectTTL(String userId, String prodId, int num) {
//1.根据商品id查询库存是否充足
//2.保存订单
String orderID = UUID.randomUUID().toString();
System.err.println("订单生成成功:" + orderID);
//3.通过mq来完成消息的分发
String exchangeName = "direct_exchange_order";
String routingKey = "ttlmesaage";//
//参数:1.交换机,2.路由key,3.消息内容
//单独设置过期时间
MessagePostProcessor postProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//设置5秒过期
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID,postProcessor);
}
消息确认机制
1. 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息
2. 当autoAck 参数为 false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
3. RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久
4. RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。如下图:
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认
其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图:
发送方确认
ConfirmCallback 方法(是否到达Exchange)
ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
ReturnCallback 方法(是否到达队列)
通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了
案例: 我们需要在生产者的yml配置中添加下面配置,表示开启发布者确认
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
# 确认消息已发送到交换机(Exchange)/这里确认机制也分为三种方式[串行confirm模式][批量confirm模式][异步confirm模式],这里讲得异步模式,具体的参考传送门文章
publisher-confirm-type: correlated # 新版本(二选一)
publisher-confirms: true # 老版本(二选一)
#确认消息已发送到队列(Queue)
publisher-returns: true
// 1, 可以绑定交换机配置时放入ioc(二选一)
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//消息进入到Exchange触发回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = Objects.requireNonNull(correlationData.getId());
System.out.println("\n确认消息送到交换机(Exchange)结果:");
System.out.println("相关数据:" + correlationData);
System.out.println("是否成功:" + ack);
System.out.println("错误原因:" + cause);
});
//消息未送达队列触发回调
rabbitTemplate.setReturnsCallback((ReturnedMessage returnedMessage) -> {
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
// 2. 可以当前消息发送对象中加载(二选一)
@PostConstruct
public void init() {
//消息进入到Exchange触发回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = Objects.requireNonNull(correlationData.getId());
System.out.println("\n确认消息送到交换机(Exchange)结果:");
System.out.println("相关数据:" + correlationData);
System.out.println("是否成功:" + ack);
System.out.println("错误原因:" + cause);
});
//消息未送达队列触发回调
rabbitTemplate.setReturnsCallback((ReturnedMessage returnedMessage) -> {
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
});
}
消费者消息接收确认
RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认, 但如果消费者收到消息后处理逻辑中出了问题, 导致了失败, 那边消息就会丢失
消息确认模式有:
- AcknowledgeMode.NONE:(默认)自动确认, 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- AcknowledgeMode.AUTO:根据情况确认, 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- AcknowledgeMode.MANUAL:手动ack,需要在业务代码结束后,调用api发送ack, api如下, 消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
1. Basic.Ack 命令:用于确认当前消息
2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展)
3. Basic.Reject 命令:用于拒绝当前消息
使用如下:
1,代码方式配置参考:
@Configuration
public class RabbitMqConfig
{
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private Receiver receiver; //消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer()
{
//消费者数量,默认10
int DEFAULT_CONCURRENT = 10;
//每个消费者获取最大投递数量 默认50
int DEFAULT_PREFETCH_COUNT = 50;
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(DEFAULT_CONCURRENT);
container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames("queue_name");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
//container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(receiver);
return container;
}
}
2.yml配置方式参考:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: [none] [auto] [manual]
MANUAL(手动ack)
上面说到,手动确认的话需要根据情况去调用不同的api,分别如下:
basicAck 方法(确认当前消息)
basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
参数说明:
long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
basicNack 方法(否定当前消息)
basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
参数说明:
long deliveryTag:唯一标识 ID
boolean multiple:上面已经解释
boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
basicReject 方法(拒绝当前消息)
basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的basicReject 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数说明:
long deliveryTag:唯一标识 ID
boolean requeue:上面已经解释
使用:
前提,配置acknowledge-mode改为manual
#消费者
@Component
public class Receiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
/**
* 确认消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
*/
channel.basicAck(deliveryTag, true);
/**
* 否定消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
//channel.basicNack(deliveryTag, true, false);
}
catch (Exception e)
{
e.printStackTrace();
/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag, true);
}
}
}
#设置监听多个队列,执行不同的消息接收确认
@Component
public class Receiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行queue_name中的消息的业务处理流程......");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行fanout.A中的消息的业务处理流程......");
}
/**
* 确认消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
*/
channel.basicAck(deliveryTag, true);
/**
* 否定消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
//channel.basicNack(deliveryTag, true, false);
}
catch (Exception e)
{
e.printStackTrace();
/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag, true);
}
}
}
AUTO(自动ack)
当换成aoto模式的时候, 代码出现异常消息, mq会进行重新投递, 但是重新投递会一直无限重试, 为了解决这种无限重试的问题, spring提供了retry(重试)机制, 使用这个机制我们就可以在消费者报错抛异常的时候, 利用本地的重试来解决这个问题, 如下所示:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
模拟一个失败的情况
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
log.info("消费者接收到simple.queue的消息:【" + msg + "】");
//该代码会报错异常, 数值溢出
System.out.println(1 / 0);
System.out.println("消费者处理消息成功!");
}
配置了失败等待的时间为1秒, 等待时间的倍数为2 , 重试次数最大为3 , 这样一来我们就会收到三次重试的信息
第一次和第二次的时间相差一秒(失败时间(1秒) x 失败倍数1 = 1秒),
第二次和第三次的时间相差两秒(失败时间(1秒) x 失败倍数2 = 2秒),
最后在到达最大重试次数就会停止重试, 但是这种方法的弊端就是当重试次数达到最大耗尽后, 消息回直接被丢弃, 解决方案:
MessageRecovery接口,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("e");
}
//失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "e");
}
}
当重试到最大值时,就会将消息转入到指定队列中
Republishing failed message to exchange 'error.direct' with routing key e
死信队列
参考文章:[传送门]
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可,没有配置,消息就会被丢失
#上面也有案例,是一样的,只是再多绑定一个队列,也就是死信队列,消息死亡后会进入到这里(自定义)
@Configuration
public class RabbitMQConfig {
// 声明业务Exchange
@Bean
public TopicExchange businessExchange(){
return new TopicExchange("businessExchange");
}
// 声明业务队列A
@Bean
public Queue businessQueue(){
Map<String, Object> args = new HashMap<>();
// x-dead-letter-exchange(固定的写法) 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "deadLetterExchange");
// x-dead-letter-routing-key(固定的写法) 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "dle.err");
return new Queue("businessQueue",true,false,false,args);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBinding(Queue businessQueue, TopicExchange businessExchange){
return BindingBuilder.bind(businessQueue).to(businessExchange).with("emp.*");
}
//声明死信Exchange
@Bean
public TopicExchange deadLetterExchange(){
return new TopicExchange("deadLetterExchange");
}
// 声明死信队列A
@Bean
public Queue deadLetterQueue(){
return new Queue("dle-queue");
}
@Bean
public Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");
}
}
#消费者
@Component
public class DedaLetterListener {
// 监听业务队列
@RabbitListener(queues = "businessQueue")
public void businessQueue(String msg, Channel channel, Message message) throws IOException {
if ("error".equals(msg)) {
System.out.println("业务消费者出现问题:" + msg);
try {
throw new RuntimeException();
}catch (Exception e){
// 无法消费消息,nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
} else {
System.out.println("正常消费消息:" + msg);
// 正常消费了消息,手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 监听死信队列
@RabbitListener(queues = "dle-queue")
public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
System.out.println("死信队列消费消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}