背景
为什么需要使用延迟队列?适用于什么场景? 场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。 这样类似的需求是我们经常会遇见的问题。最常用的方法是定期轮训数据库,设置状态。在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来。通过使用延迟队列来解决这种问题
使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的Time To Live(TTL)和Dead Letter Exchanges(DLX),利用两者的组合来实现延迟队列 简述一下:A.消息的TTL就是消息的存活时间,B.DLX是死信路由 实现原理:先发送一个消息到队列中,设置存活时间,超时后会转发到死信路由中,客户端消费死信路由中的消息,消息中包装好需要转发的队列名,再根据此队列名发送消息,这样间接中转的方式实现了延迟队列。
实现 新建SpringBoot项目,添加 amqp 引用 1 2 3 4 5 6 7 8 9 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
在配置文件application.properties中配置好mq的连接地址 1 2 3 4 5 6 #rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root spring.rabbitmq.virtual-host=ykh_vhosts
创建配置类,使用配置文件中的连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 /** * 读取application.properties中的连接配置 */ @Configuration public class RabbitMQConfiguration { private static Logger logger = Logger.getLogger("RabbitMQConfiguration"); @Value("${spring.rabbitmq.host}") public String host; @Value("${spring.rabbitmq.port}") public int port; @Value("${spring.rabbitmq.username}") public String username; @Value("${spring.rabbitmq.password}") public String password; @Value("${spring.rabbitmq.virtual-host}") public String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); logger.info("Create ConnectionFactory bean .."); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
创建一个常量类,定义队列名称 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 /** * Rabbit消息队列相关常量 */ public final class MQConstant { private MQConstant(){} //exchange name public static final String DEFAULT_EXCHANGE = "ZyChange"; //TTL QUEUE public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "zy.dead.letter.queue"; //DLX repeat QUEUE 死信转发队列 public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "zy.repeat.trade.queue"; //Hello 测试消息队列名称 public static final String HELLO_QUEUE_NAME = "HELLO"; }
创建一个队列配置类,作用是信道配置,队列配置,队列绑定 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 /** * 队列配置,所有配置@Bean的队列名称,由系统启动时创建队列,并绑定到Exchane上 */ @Configuration public class QueueConfiguration { //信道配置 @Bean public DirectExchange defaultExchange() { return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false); } /********************* 业务队列定义与绑定 hello 测试 *****************/ @Bean public Queue queue() { Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true); return queue; } @Bean public Binding binding() { //队列绑定到exchange上,再绑定好路由键 return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME); } /********************* 业务队列定义与绑定 hello 测试 *****************/ //下面是延迟队列的配置 //转发队列 @Bean public Queue repeatTradeQueue() { Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false); return queue; } //绑定转发队列 @Bean public Binding drepeatTradeBinding() { return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME); } //死信队列 -- 消息在死信队列上堆积,消息超时时,会把消息转发到转发队列,转发队列根据消息内容再把转发到指定的队列上 @Bean public Queue deadLetterQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE); arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME); Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments); return queue; } //绑定死信队列 @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME); } }
创建消息生成接口和实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public interface IMessageService { /** * 发送消息到队列 * @param queueName 队列名称 * @param message 消息内容 */ public void send(String queueName,String message); /** * 延迟发送消息到队列 * @param queueName 队列名称 * @param message 消息内容 * @param times 延迟时间 单位毫秒 */ public void send(String queueName,String message,long times); } /** * 消息队列服务接口实现 */ @Service("messageService") public class MessageService implements IMessageService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息到队列 * @param queueName 队列名称 * @param message 消息内容 */ @Override public void send(String queueName, String message) { rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, message); } /** * 延迟发送消息到队列 * @param queueName 队列名称 * @param message 消息内容 * @param times 延迟时间 单位毫秒 */ @Override public void send(String queueName, String message, long times) { //消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上 //发送前,把消息进行封装,转发时应转发到指定 queueName 队列上 DLXMessage dlxMessage = new DLXMessage(MQConstant.DEFAULT_EXCHANGE,queueName,message,times); MessagePostProcessor processor = new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(times + ""); return message; } }; rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSON.toJSONString(dlxMessage), processor); } }
死信消息载体 DLXMessage是一个消息封装对象,很关键,发送延迟队列时,先把消息存在此对象中,在加上目的地队列名称,然后再发到死信队列中,当消息超时时,转发到转发队列,添加对转发队列的监听,消费转发队列,获取需要延迟发送的信息,该信息就是DLXMessage对象,这样就拿到了目的地队列名称,然后再发送一次消息,就完成了延迟队列的发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 /** * rabbit 死信消息载体 */ public class DLXMessage implements Serializable { private static final long serialVersionUID = 9956432152000L; private String exchange; private String queueName; private String content; private long times; public DLXMessage() { super(); } public DLXMessage(String queueName, String content, long times) { super(); this.queueName = queueName; this.content = content; this.times = times; } public DLXMessage(String exchange, String queueName, String content, long times) { super(); this.exchange = exchange; this.queueName = queueName; this.content = content; this.times = times; } public static long getSerialVersionUID() { return serialVersionUID; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public long getTimes() { return times; } public void setTimes(long times) { this.times = times; } }
添加消息消费者监听,当有消息时进行消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 //监听hello队列,有消息时进行消费 @Component @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME) public class ReceiverMessage { @RabbitHandler public void process(String content) { System.out.println("接受时间:"+ System.currentTimeMillis()); System.out.println("接受消息:" + content); } } //监听转发队列,有消息时,把消息转发到目标队列 @Component @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME) public class ReceiverDelayMessage { @Autowired private IMessageService messageService; @RabbitHandler public void process(String content) { //此时,才把消息发送到指定队列,而实现延迟功能 DLXMessage message = JSON.parseObject(content, DLXMessage.class); messageService.send(message.getQueueName(), message.getContent()); } }
测试,启动项目,会执行发送消息代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /** * 启动启动时执行 */ @Component public class SysInitLoad implements ApplicationRunner { @Autowired private IMessageService messageService; @Override public void run(ApplicationArguments args) throws Exception { System.out.println("发送时间:"+ System.currentTimeMillis()); String message = "测试延迟消息"; messageService.send(MQConstant.HELLO_QUEUE_NAME,message,6000); message = "测试普通消息"; messageService.send(MQConstant.HELLO_QUEUE_NAME,message); } }
普通消息马上就接收到了,延迟消息6s后收到。
以上就是订单超时未处理的解决方案,希望能够给予大家帮助。
欢迎关注获取更多资源