RabbitMQ实现延时队列
- 一、介绍
- 1.TTL
- 如何设置TTL(2种方式):
- 2.Dead Letter Exchanges
- 二、实现延时队列的思路
- 三、SpringBoot+RabbitMQ实现延时队列
- 1.RabbitMQConfig配置类
- 2.消费者类
- 3.生产者
- 测试
利用rabbitmq的延迟队列实现延迟(定时)任务
实现动态的定时任务许多同学的第一反应就是通过spring的schedule定时任务轮询数据库来实现,这种方案有一下几点劣势:
(1)消耗系统内存,由于定时任务一直在系统中占着进程,比较消耗内存
(2)增加了数据库的压力,这个提现在两方面,一是长时间占着数据库的连接,查询基数大
(3)存在较大的时间误差
但是我们利用第三方插件如rabbitmq来实现。
一、介绍
RabbitMQ本身是没有直接支持延迟队列功能,但是可以通过TTL和DLX模拟出延迟队列的功能。 通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
1.TTL
TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,如果这条消息在TTL内的时间未被消费则该条消息则变成死信,如果配置了消息的TTL和队列的TTL则较小的那个值会被使用。
消息的TTL才是实现延迟任务的关键。
如何设置TTL(2种方式):
1.一种是创建队列的时候设置队列的“x-message-ttl”属性。
2.另一种是针对每条消息设置TTL。
注意:
第一种如果设置了队列的TTL,如果消息过期则被进入死信队列;
而第二种即使消息过期也不会马上被丢弃, 因为消息是否过期是在即将投递到消费者之前被判定的。此外,如果不设置TTL则表示消息永远不会过期,消息过期则变成死信。
2.Dead Letter Exchanges
DLX就是死信交换机。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
②. 上面的消息的TTL到了,消息过期了。
③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列(即延时队列)中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
注意:
即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。
所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
二、实现延时队列的思路
延时队列,就是想要消息延迟多久被处理。
TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
三、SpringBoot+RabbitMQ实现延时队列
1.RabbitMQConfig配置类
创建延时队列、死信队列、延时交换机、死信交换机
我们使用一个普通的队列来当做延时队列,设置TTL,当消息过期后会进入死信队列。
- 将延时队列与延时交换机进行绑定并设置路由key
- 将死信队列与私信交换机进行绑定bing设置路由key
- 并在延时队列中新建一个Map,设置当前队列绑定的死信交换机、当前队列的死信路由key、队列的TTL等参数。
视图:
// 延时交换机
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange("delayExchange");
}
// 死信交换机
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange("deadLetterExchange");
}
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue("deadLetterQueueA");
}
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "deadLetterExchange");
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "deadLetterQueueAroutingkey");
// x-message-ttl 声明队列的TTL 单位是毫秒 1000*6
args.put("x-message-ttl", 6000);
return QueueBuilder.durable("delayQueueA").withArguments(args).build();
}
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delayQueueAroutingkey");
}
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("deadLetterQueueAroutingkey");
}
2.消费者类
消费者直接监听死信队列即可!死信队列中的消息就是延时的消息。
@Component
@Slf4j
public class RabbitMQConsumer {
@RabbitListener(queues = "deadLetterQueueA")
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
3.生产者
在控制层调用RabbitTemplate发送消息到延时队列。
@RestController
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/delayQueue")
public void test(){
rabbitTemplate.convertAndSend("delayExchange","delayQueueAroutingkey",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
System.out.println("延时队列已发送");
}
}
测试
测试成功
参考文章【RabbitMQ】一文带你搞定RabbitMQ延迟队列
这篇文章写得太好了!致敬!