• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ 延迟队列

武飞扬头像
尖笔尖
帮助1

业务需求

发送消息可能出现失败的情况,此时需要对消息进行重新发送,重新发送需要设置一定的延迟间隔,针对同一条消息的重试,间隔要随着重试次数线性增长(第一次间隔 3 秒,第二次间隔 10 秒,第三次 30 秒...)。

方案选择

传统的解决方案是利用消息过期变成死信来模拟实现的,即 死信队列 DLX TTL 的方案;而在 RabbitMQ 3.6.x 开始后,官方提供了延迟队列的插件 rabbitmq-delayed-message-exchange

DLX TTL

学新通

在实现上,过期时间可以设置在图示 delay 队列上或者 msg 消息上

  • 如果过期时间设置在消息上,就会因为时序问题形成队头阻塞现象。因为队列消息是按序消费的,如果队头的消息延迟时间是 10s, 后面的消息都要等至少 10s 后才可以进行消费;
  • 如果过期时间设置在队列上,所有发送到队列的消息延迟时间都是该队列设定值,而业务需求延迟时间是随着重试次数线性增长的,这样就需要创建很多个固定延迟时间的队列。

delay-message 插件

该插件可以很好的解决这个需求。

首先,它是将延迟时间设置在消息上的,这样只要创建一个队列即可;

其次,指定为延迟类型的交换机在接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间,在达到可投递时间时才投递至目标队列,这样就不存在队头阻塞现象。

1、定义 x-delayed-message

  1.  
    @Bean(name = "yourDelayExchangeName")
  2.  
    public TopicExchange exchange() {
  3.  
    TopicExchange exchange = new TopicExchange("yourDelayExchangeName", true, false);
  4.  
    exchange.setDelayed(true); // 这一行是重点,指定交换机类型
  5.  
    return exchange;
  6.  
    }

2、发送消息

  1.  
    public void sendDelayMessage(String exChange, int delayTimeS, String routeKey, String message) {
  2.  
    rabbitTemplate.convertAndSend(exChange, routeKey, message, new MessagePostProcessor() {
  3.  
    @Override
  4.  
    public Message postProcessMessage(Message message) throws AmqpException {
  5.  
    // 设置消息延迟时间,单位 ms
  6.  
    message.getMessageProperties().setHeader("x-delay", delayTimeS * 1000);
  7.  
    return message;
  8.  
    }
  9.  
    });
  10.  
    }

处理逻辑

学新通

 1. 待通知记录落表,状态记为初始态 I,重试次数设为1;

2. 异步将 待通知记录主键 作为信息发送到 MQ,生产端流程结束;

3. 消费端获取到消息,首先通过主键查询记录信息,检查状态是否为已经处理完成:

        3.1 如果已处理,直接返回(这里是为了处理重复消费的场景)。

        3.2 如果未处理完成,继续下一步;

4. 发送通知,判断处理结果:

        4.1 如果处理成功,更新状态为成功态 S;

        4.2 如果处理失败,将重试次数 1,检查重试次数是否已经达到重试最大次数:

                4.2.1 如果达到最大值,设置相应的延迟时间,更新状态为失败态 F;

                4.2.2 否则,根据重试次数,设置相应的延迟时间,将消息重新发送到 MQ;同时更新重试次数。

5. 消费完成, ack 应答。

异常处理

发送消息异常

步骤 2 可能出现消息发送 MQ 失败的情况,因此需要有自动任务对于这种异常记录作重发补偿。

消息处理异常

步骤 4 对消息处理可能涉及到和外部系统的交互,这就可能导致出现各种异常。

本方案通过将消息重新发送到 MQ 队列的方式来实现重试,同时处理了延迟时间随重试次数线性变换的需求。

ack 应答异常

步骤 5 消息处理完后要手动 ack 才能将消息从队列中移除,如果 ack 发生异常,下次消费就属于重复消费。

步骤 3.1 在每次处理前先检查当前记录状态,可以很好地解决重复消费的问题。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhhkifbk
系列文章
更多 icon
同类精品
更多 icon
继续加载