RabbitMQ通过TTL和DLX实现延时队列

   日期:2020-08-29     浏览:180    评论:0    
核心提示:RabbitMQ实现延时队列一、介绍1.TTL如何设置TTL(2种方式):2.Dead Letter Exchanges二、实现延时队列的思路三、SpringBoot+RabbitMQ实现延时队列1.RabbitMQConfig配置类2.消费者类3.生产者测试一、介绍RabbitMQ本身是没有直接支持延迟队列功能,但是可以通过TTL和DLX模拟出延迟队列的功能。 通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(

RabbitMQ实现延时队列

  • 一、介绍
    • 1.TTL
      • 如何设置TTL(2种方式):
    • 2.Dead Letter Exchanges
  • 二、实现延时队列的思路
  • 三、SpringBoot+RabbitMQ实现延时队列
    • 1.RabbitMQConfig配置类
    • 2.消费者类
    • 3.生产者
    • 测试

利用rabbitmq的延迟队列实现延迟(定时)任务
实现动态的定时任务许多同学的第一反应就是通过spring的schedule定时任务轮询数据库来实现,这种方案有一下几点劣势:
(1)消耗系统内存,由于定时任务一直在系统中占着进程,比较消耗内存
(2)增加了数据库的压力,这个提现在两方面,一是长时间占着数据库的连接,查询基数大
(3)存在较大的时间误差
但是我们利用第三方插件如rabbitmq来实现。

一、介绍

RabbitMQ本身是没有直接支持延迟队列功能,但是可以通过TTL和DLX模拟出延迟队列的功能。 通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)

1.TTL

TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,如果这条消息在TTL内的时间未被消费则该条消息则变成死信,如果配置了消息的TTL和队列的TTL则较小的那个值会被使用。
消息的TTL才是实现延迟任务的关键。

如何设置TTL(2种方式):

1.一种是创建队列的时候设置队列的“x-message-ttl”属性。
2.另一种是针对每条消息设置TTL。

注意
第一种如果设置了队列的TTL,如果消息过期则被进入死信队列;
而第二种即使消息过期也不会马上被丢弃, 因为消息是否过期是在即将投递到消费者之前被判定的。此外,如果不设置TTL则表示消息永远不会过期,消息过期则变成死信。

2.Dead Letter Exchanges

DLX就是死信交换机。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
②. 上面的消息的TTL到了,消息过期了。
③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列(即延时队列)中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

注意
即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

二、实现延时队列的思路

延时队列,就是想要消息延迟多久被处理。
TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

三、SpringBoot+RabbitMQ实现延时队列

1.RabbitMQConfig配置类

创建延时队列、死信队列、延时交换机、死信交换机
我们使用一个普通的队列来当做延时队列,设置TTL,当消息过期后会进入死信队列。

  1. 将延时队列与延时交换机进行绑定并设置路由key
  2. 将死信队列与私信交换机进行绑定bing设置路由key
  3. 并在延时队列中新建一个Map,设置当前队列绑定的死信交换机、当前队列的死信路由key、队列的TTL等参数。

视图:

 

    // 延时交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange("delayExchange");
    }

    // 死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange("deadLetterExchange");
    }

    
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue("deadLetterQueueA");
    }

    
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", "deadLetterExchange");
        // x-dead-letter-routing-key 这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "deadLetterQueueAroutingkey");
        // x-message-ttl 声明队列的TTL 单位是毫秒 1000*6
        args.put("x-message-ttl", 6000);
        return QueueBuilder.durable("delayQueueA").withArguments(args).build();
    }


    
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("delayQueueAroutingkey");
    }

    
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("deadLetterQueueAroutingkey");
    }

2.消费者类

消费者直接监听死信队列即可!死信队列中的消息就是延时的消息。

@Component
@Slf4j
public class RabbitMQConsumer {

    @RabbitListener(queues = "deadLetterQueueA")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

3.生产者

在控制层调用RabbitTemplate发送消息到延时队列。

@RestController
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
     @GetMapping("/delayQueue")
    public void test(){
        rabbitTemplate.convertAndSend("delayExchange","delayQueueAroutingkey",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        System.out.println("延时队列已发送");
    }
}

测试

测试成功

参考文章【RabbitMQ】一文带你搞定RabbitMQ延迟队列
这篇文章写得太好了!致敬!

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服