RabbitMQ发布确认高级篇

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

1、发布确认 springboot 版本

确认机制

如果没有确认机制生产者只管着发消息,交换机挂了也不知道,发的消息都没了回复。但是加了缓存,也就是加一个消息的备份,交换机挂了缓存里还有一份,给缓存添加定时任务让它队未成功发送的消息重新发,交换机上线了就收到了。

image-20240303144358737

思路

image-20240303144418438

1.1、配置类ConfirmConfig

//发布确认高级篇
@Configuration
public class ConfirmConfig {
    //交换机名称
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    //队列名称
    public static final String CONFIRM_QUEUE = "confirm.queue";
    //routingKey
    public static final String CONFIRM_KEY = "confirmKey";
    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE);
    }
    //将其进行绑定
    @Bean
    public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue,
                               @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_KEY);
    }
}

1.2、生产者添加请求

//confirm发布确认正常发送测试
@GetMapping("/confirmMsg/{message}")
public void confirmMsg(@PathVariable String message){
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_KEY,message);
    log.info("confirm发布确认正常测试发送消息:{}",message);
}

1.3、消费者接收

//发布确认
@Slf4j
@Component
public class ConfirmConsumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void receiveConfirm(Message message){
         String s = new String(message.getBody());
        log.info("收到了发给confirm.queue的消息:{}",s);
    }
}

启动测试一下正常的收发没有问题

image-20240303155633046

而假如说现在交换机挂了,那信息肯定就没了,在发布确认机制里生产者发给交换机,交换机收到了要确认收到消息了才能算发送成功,如果没确认或者确认的失败的那还算是发送失败。使用RabbitTemplate.ConfirmCallback回调接口进行消息的回调。

1.4、回调接口

//发布确认回调接口实现
@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        //注入
      rabbitTemplate.setConfirmCallback(this);
    };
    
    /*confirm各参数
    * 1、交换机接收到了消息,回调
    *   1.1、correlationData 保存回调消息的ID和相关信息
    *   1.2、布尔值就是是否成功收到消息,收到就是true
    *   1.3、cause为null,这是未成功收到消息的原因
    * 2、交换机没有收到消息,还是会回调
    *   2.1、correlationData 保存回调消息的ID和相关信息
    *   2.2、false
    *   2.3、cause中有失败原因
    * */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //拿到消息ID
        String id = correlationData != null ? correlationData.getId() : "null";
        //根据ack判断有没有成功收到消息
        if (b) {
            log.info("交换机已经收到消息ID为:{}",id);
        }else {
            log.info("交换机还没有收到消息ID为:{}原因为:{}",id,s);
        }
​
    }
}

要想使用,还需要在配置文件中配置

spring.rabbitmq.host=192.168.111.28
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

有三种选择:

  • NONE

    禁用发布确认模式,是默认值

  • CORRELATED

    发布消息成功到交换器后会触发回调方法

  • SIMPLE(单个确认)

    有两种效果,其一效果和 CORRELATED 值一样会触发回调方法, 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭channel,则接下来无法发送消息到 broker

现在将生产者添加一些CorrelationData对象放入消息ID

//confirm发布确认发送测试
@GetMapping("/confirmMsg/{message}")
public void confirmMsg(@PathVariable String message){
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_KEY,message,correlationData);
    log.info("confirm发布确认正常测试发送消息:{}",message);
}

测试一下,正常情况下,回调函数的触发

image-20240303174009412

想要不正常模拟交换机挂掉了,就把交换机的名字写错就好了,测试一下:

//confirm发布确认发送测试
@GetMapping("/confirmMsg/{message}")
public void confirmMsg(@PathVariable String message){
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+"666",ConfirmConfig.CONFIRM_KEY,message,correlationData);
    log.info("confirm发布确认正常测试发送消息:{}",message);
}

非正常的情况下还是会进行回调的,返回的消息就是报错内容

image-20240303174419985

刚才演示的是交换机挂了,现在模拟交换机没挂但是队列挂了,我们可以一次性发两个消息,第一个是正常的消费者能接收到,第二个把routingKey改了就相当于队列挂了,看结果如何。

//confirm发布确认发送测试
@GetMapping("/confirmMsg/{message}")
public void confirmMsg(@PathVariable String message){
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,
            ConfirmConfig.CONFIRM_KEY,message+"正常的",correlationData);
    CorrelationData correlationData1 = new CorrelationData("2");
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,
            ConfirmConfig.CONFIRM_KEY+"k2",message+"队列没了的",correlationData1);
    log.info("confirm发布确认正常测试发送消息:{}",message);
}

image-20240303175014489

结果是交换机确实两个消息都收到了,但是消费者只收到了第一条消息,第二条没有收到,交换机两个消息确实都收到了回调是正常的回调消息。但是,消费者没收到消息好像都得进行回调的吧,应该让队列收不到信息也能进行失败的回调才对。

1.5、回退消息

1.5.1、Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如 果发现该消息不可路由(就是无法发给队列),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

1.5.2、配置文件添加配置

添加开启回退功能

spring.rabbitmq.publisher-returns=true

1.5.3、回退接口

//发布确认回调接口实现
@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        //注入
      rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.setReturnCallback(this);
    };
​
    /*confirm各参数
    * 1、交换机接收到了消息,回调
    *   1.1、correlationData 保存回调消息的ID和相关信息
    *   1.2、布尔值就是是否成功收到消息,收到就是true
    *   1.3、cause为null,这是未成功收到消息的原因
    * 2、交换机没有收到消息,还是会回调
    *   2.1、correlationData 保存回调消息的ID和相关信息
    *   2.2、false
    *   2.3、cause中有失败原因
    * */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //拿到消息ID
        String id = correlationData != null ? correlationData.getId() : "null";
        //根据ack判断有没有成功收到消息
        if (b) {
            log.info("交换机已经收到消息ID为:{}",id);
        }else {
            log.info("交换机还没有收到消息ID为:{}原因为:{}",id,s);
        }
​
    }
    //通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
    //只有不可达目的地时才能回退
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息:{},被交换机:{}退回了,原因是:{},路由KEY:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }
}

启动服务测试,还是原来的请求

image-20240303200013801

发现被消息被退掉了

1.6、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息 无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。

如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

我们可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?

备份交换机可以理解为RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警,这么一看来还有点像死信队列似的

image-20240303201002880

原来的交换机已经是好的了,现在就差应该备份交换机,关键是原交换机怎么和备份交换机建立联系,应该给原交换机构建的时候加参数

1.6.1、修改配置类

ConfirmConfig.java

//发布确认高级篇
@Configuration
public class ConfirmConfig {
    //交换机名称
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    //备份交换机
    public static final String BACKUP_EXCHANGE = "backup.exchange";
    //队列名称
    public static final String CONFIRM_QUEUE = "confirm.queue";
    //备份队列
    public static final String BACKUP_QUEUE = "backup.queue";
    //报警队列
    public static final String WARNING_QUEUE = "warning.queue";
    //routingKey
    public static final String CONFIRM_KEY = "confirmKey";
    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    //声明交换机
    //添加参数,将备份交换机给原确认交换机选择,给它当备胎
    @Bean
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE).build();
    }
    //将其进行绑定
    @Bean
    public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue,
                               @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_KEY);
    }
    //声明报警队列和备份队列
    @Bean("backQueue")
    public Queue backQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }
    //声明备份交换机
    @Bean("backExchange")
    public FanoutExchange backExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE);
    }
    //备份和警告队列绑定备份交换机
    @Bean
    public Binding backBinding(@Qualifier("backQueue") Queue backQueue,
                               @Qualifier("backExchange") FanoutExchange backExchange){
        return BindingBuilder.bind(backQueue).to(backExchange);
    }
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue warningQueue,
                               @Qualifier("backExchange") FanoutExchange backExchange){
        return BindingBuilder.bind(warningQueue).to(backExchange);
    }
​
}

1.6.2、报警消费者

//报警消费者
@Slf4j
@Component
public class WarningConsumer {
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)
    public void receiveWarning(Message message){
        String s = new String(message.getBody());
        log.info("这里是报警消费者,收到了一个报警消息:{}",s);
    }

生产者还是使用之前的有故意写错的生产者请求,在测试之前需要把原来的交换机删除,重新创建才有和备份交换机连接。

启动测试

image-20240304095046790

我们发现报警信息是有的,但是按理说,报警信息没有进正常confirm.queue的队列之前的回调会被触发呀,但是结果并没有。

那是因为mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。

2、幂等性

2.1、概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了多次消费。

举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

2.2、消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

2.3、解决方法

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 ID 来判断,或者可按自己的规则生成一个全局唯一 ID,每次消费消息时用该 ID先判断该消息是否已消费过。

2.4、消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

  • 唯一 ID+指纹码机制,利用数据库主键去重,

  • 利用 redis 的原子性去实现(一般就是这种方法setnx)

2.5、唯一 ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基 本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中。

  • 优势就是实现简单就一个拼接,然后查询判断是否重复;

  • 劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但并不推荐这种方法。

2.6、Redis原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。

redis的分布式锁 setnx(UUID,value) 还可以加个自动过期时间 ,每次进来先判断get(UUID)有没有值,有值就返回,没有值再走正常逻辑。

3、优先级队列(0-255)

有一个场景:

在商城系统中有订单催付的场景,我们在淘宝下个单,马上就有订单出来推送给我们,如果用户在规定时间内或者之前多长时间还没支付订单,那淘宝该给我们发消息了。那淘宝的商家也分大小客户呢,像数码产品比较贵重的华为、苹果、小米他们的订单应该得到优先的处理吧,他们带来的利润那么大当然优先处理。

之前后端系统使用redis来存放定时轮询,而redis只能使用List做一个简单的消息队列,现在可以用linsert命令在列表中间插入消息实现消息的优先级设定,但专业的事情还是要专业的人来做嘛,订单量大了采用RabbitMQ进行改造和优化,大客户就是高优先级,其他的可以默认优先级。

3.1、添加优先级

3.1.1、在控制台界面添加

image-20240304105316714

3.1.2、队列中代码添加优先级

HashMap<String, Object> arg = new HashMap<>();
arg.put("x-max-priority",10);//此处是设置队列中的消息最大优先级是10,不要设置太大,浪费CPU
channel.queueDeclare(QUEUE_NAME,false,false,false,arg);

官方是0-255,在队列中设置的优先级是消息的最大优先级,10个以内好排序,太大浪费CPU

3.1.3、消息中代码添加优先级

发10条消息

//发10个消息
for (int i = 1; i < 11; i++) {
    String message = "msg" + i;
    if ( i == 5){
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
        channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
    }else channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}

发送消息后再启动消费者看看输出顺序

image-20240304111740030

第五条消息确实是第一个被输出的,它优先级高。

以上是普通的最原始的方式演示的优先级案例,下面我们看一下怎么在springboot项目中演示

3.1.4、springboot中优先级队列演示

首先再开一个交换机和队列

//优先级演示
@Configuration
public class PriorityConfig {
    //交换机
    public static final String PRIORITY_EXCHANGE = "priorityExchange";
    //队列
    public static final String PRIORITY_QUEUE = "priorityQueue";
    //路由key
    public static final String PRIORITY_KEY = "priorityKey";
    //声明队列
    @Bean
    public Queue priorityQueue(){
        //maxPriority为队列设置优先级,设置的其实是队列中消息的最大优先值
        return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
    }
    //声明交换机
    @Bean
    public DirectExchange priorityExchange(){
        return new DirectExchange(PRIORITY_EXCHANGE);
    }
    //绑定
    @Bean
    public Binding priorityBinding(@Qualifier("priorityQueue") Queue priorityQueue,
                                   @Qualifier("priorityExchange") DirectExchange priorityExchange){
        return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PRIORITY_KEY);
    }
}

然后到controller中再加一个请求

//优先级测试,为5号信息设置高一点的优先级
@GetMapping("/priority")
public void priority(){
    //发送10个消息,给5号设置优先级
    for (int i = 1; i < 11; i++) {
        String msg = "priority:"+i;
        MessageProperties properties = new MessageProperties();
        if (i == 5){
            properties.setPriority(5);//在这里设置低5条消息的优先级为5
            Message message = new Message(msg.getBytes(), properties);
            rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE,PriorityConfig.PRIORITY_KEY
            ,message);
        }else rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE,PriorityConfig.PRIORITY_KEY
                ,msg);
    }
    log.info("发送消息完成!!!");
}

为了方便,我就不搞什么消息堆积什么让消费者睡一会儿再消费了,我直接借用最原始的方式,等消费者发完,过一会儿我再开启消费者进行消费,这样队列里的消息就应该已经按照预期排好了,直接消费即可。至于为什么不把消费者也使用springboot,因为消费者打上@Componet组件后程序启动它也初始化,消费者直接就把消息给消费了,所以还是手动吧,还是挺方便的。

消费者:

public class PriorityConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.111.28");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("消息内容:"+new String(message.getBody()));
        };
        //声明取消消息回调
        CancelCallback callback = consumerTag ->{
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(PriorityConfig.PRIORITY_QUEUE,true,deliverCallback,callback);
    }
}

那么现在启动服务,浏览器输入localhost:8080/priority生产者发送完消息

image-20240304152401231

好的选择启动消费者

image-20240304152436128

OK,是我们想的那样,第一条消息就是5了。

4、惰性队列

4.1、使用场景

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

  • 一般情况下消息是在内存;

  • 惰性队列中消息保存在磁盘;

4.2、两种模式

队列具备两种模式:defaultlazy

默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。

lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。 在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下

演示:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

在控制台中可视化添加队列时可以设置队列参数为惰性队列

image-20240304131100090

4.3、内存开销对比

image-20240304130828814

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB