RabbitMQ 延迟队列取消订单

2018-06-09 17:42:26
1046次阅读
0个评论
问题描述
在订单系统,当用户下单后需要在10分钟内完成支付,否则取消订单。



解决方案
如果我们使用定时任务来做,那这个失效时间对不准确,当时可以提高定时任务的执行频率来减小这个误差。

使用延迟队列,我们这里主要将这种方式。


基本概念

所谓的‘延迟队列“就是消息被发送以后,不直接被消费者消费,而是等到特定时间后消费者才能拿到消息消费。


详细说明
RabbitMQ本身不支持延迟队列,但是我们可以使用死信队列(DLX)和设置有效时间(TTL)两个特性来实现延迟队列。

先新建队列order_query并设置消息有效时间是10分钟,然后绑定一个死信队列order_dead_query,消费者消费order_dead_query队列的消息。生成订单的时候往队列order_query发一条消息,当10分钟后这条消息会进入死信队列order_dead_query里面并被我们消费者消费,这时我们去查询一下该订单的支付状态,如果是已支付不做任何操作,如果是未支付就取消订单。


声明队列 RabbitConfig


/**
 * RabbitMQ 配置类
 */
@Configuration
public class RabbitConfig {

    /**
     * 方法rabbitAdmin的功能描述:动态声明queue、exchange、routing
     *
     * @param connectionFactory
     * @return
     * @author : yuhao.wang
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //声明死信队列(Fanout类型的exchange)
        Queue deadQueue = new Queue(RabbitConstants.QUEUE_NAME_DEAD_QUEUE);
        // 死信队列交换机
        FanoutExchange deadExchange = new FanoutExchange(RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
        rabbitAdmin.declareQueue(deadQueue);
        rabbitAdmin.declareExchange(deadExchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange));

        // 发放奖励队列交换机
        DirectExchange exchange = new DirectExchange(RabbitConstants.MQ_EXCHANGE_SEND_AWARD);

        //声明发送优惠券的消息队列(Direct类型的exchange)
        Queue couponQueue = queue(RabbitConstants.QUEUE_NAME_SEND_COUPON);
        rabbitAdmin.declareQueue(couponQueue);
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with(RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON));

        return rabbitAdmin;
    }

    public Queue queue(String name) {
        Map<String, Object> args = new HashMap<>();
        // 设置死信队列
        args.put("x-dead-letter-exchange", RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
        args.put("x-dead-letter-routing-key", RabbitConstants.MQ_ROUTING_KEY_DEAD_QUEUE);
        // 设置消息的过期时间, 单位是毫秒
        args.put("x-message-ttl", 5000);

        // 是否持久化
        boolean durable = true;
        // 仅创建者可以使用的私有队列,断开后自动删除
        boolean exclusive = false;
        // 当所有消费客户端连接断开后,是否自动删除队列
        boolean autoDelete = false;
        return new Queue(name, durable, exclusive, autoDelete, args);
    }
}
设置消息的过期时间, 单位是毫秒 
args.put(“x-message-ttl”, 5000);

消费者消费死信队列 DeadMessageListener



/**
 * 延迟队列消费
 */
@Service
public class DeadMessageListener {

    private final Logger logger = LoggerFactory.getLogger(DeadMessageListener.class);

    @RabbitListener(queues = RabbitConstants.QUEUE_NAME_DEAD_QUEUE)
    public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
        logger.info("[{}]处理延迟队列消息队列接收数据,消息体:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, JSON.toJSONString(sendMessage));

        System.out.println(message.getMessageProperties().getDeliveryTag());

        try {
            // 参数校验
            Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");

            // TODO 处理消息

            // 确认消息已经消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("MQ消息处理异常,消息体:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e);

            try {
                // TODO 保存消息到数据库

                // 确认消息已经消费成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception dbe) {
                logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息体:{}", JSON.toJSONString(sendMessage), dbe);
                // 确认消息将消息放到死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}
收藏00

登录 后评论。没有帐号? 注册 一个。

admin

官方人员
  • 0回答
  • 0粉丝
  • 0关注