美文网首页
RabbitMQ 死信队列

RabbitMQ 死信队列

作者: SheHuan | 来源:发表于2021-06-01 21:55 被阅读0次

一、认识死信队列

首先了解一下什么是死信,官方将其翻译为单词Dead Letter。死信,其实这是 RabbitMQ 中一种消息类型,和普通的消息在本质上没有什么区别,更多的是一种业务上的划分。如果队列中的消息出现以下情况之一,就会变成死信:

  • 消息接收时被拒绝会变成死信,例如调用channel.basicNackchannel.basicReject ,并设置requeuefalse
  • 如果给消息队列设置了消息的过期时间(x-message-ttl),或者发送消息时设置了当前消息的过期时间,当消息在队列中的存活时间大于过期时间时,就会变成死信。
  • 如果给消息队列设置了最大容量(x-max-length),队列已经满了,后续再进来的消息会溢出,无法被队列接收就会变成死信。

如果不对死信做任何处理,则消息会被直接丢弃。一般死信都是那些在业务上未被正常处理的消息,我们可以考虑用一个队列来接收这些死信消息,接收死信消息的队列就是死信队列,它就是一个普通的消息队列,没有什么特殊的,只是我们在业务上赋予了它特殊的职责罢了,后期再根据实际情况处理死信队列中的消息即可。

二、准备工作

创建一个 SpringBoot 项目,添加 RabbitMQ 依赖,并添加需要的配置:

# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

接下来创建一个死信队列、交换机,并完成绑定,这里的交换机也可以称作死信交换机,交换机的类型没有特殊的要求根据实际需求选择即可:

@Configuration
public class DeadLetterRabbitMQConfig {
    // 创建交换机
    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange", true, false);
    }

    // 创建死信队列
    @Bean
    Queue deadLetterQueue() {
        return new Queue("dead.letter.queue", true);
    }

    // 绑定队列和交换机
    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
    }
}

三、死信队列用法

这里我们根据文章开头描述的,正常消息变成死信的几种场景分别来看死信队列的用法。

1、消息被拒绝

首先创建处理业务消息的交换机、队列:

@Configuration
public class BusinessRabbitMQConfig {
    // 创建交换机
    @Bean
    DirectExchange businessExchange() {
        return new DirectExchange("business.exchange", true, false);
    }

    // 创建业务消息队列
    @Bean
    Queue businessQueue1() {
        HashMap<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        // 设置死信交换机绑定队列的routingKey
        args.put("x-dead-letter-routing-key", "dead.letter");
        return new Queue("business.queue1", true, false, false, args);
    }

    @Bean
    Binding businessBinding1() {
        return BindingBuilder.bind(businessQueue1()).to(businessExchange()).with("business1");
    }
}

创建business.queue1时,我们给它配置了前边创建死信交换机、以及 routingKey,这样就完成了业务消息队列和死信队列的绑定,业务消息被拒绝后,就会进入死信队列。

注意,如果队列已经创建,之后再修改队列的配置参数,则不会生效,需要删除掉队列重新创建

接下来,创建消费者来消费business.queue1中的业务消息,为了突出效果,直接让消费者拒绝掉消息,不了解消息确认机制的可以翻阅之前的文章:

@Service
public class BusinessReceiveService {
    @RabbitListener(queues = "business.queue1")
    public void receive(String msg, Channel channel, Message message) {
        try {
            // 拒绝消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("拒绝的业务消息:" + msg);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

发送消息的服务很简单:

@Service
public class BusinessSendService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(String routingKey, String message) {
        rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
        System.out.println("发送的业务消息:" + message);
    }
}

启动项目后,通过测试类发送一条消息:

@SpringBootTest
class DeadLetterApplicationTests {
    @Autowired
    BusinessSendService businessSendService;

    @Test
    void contextLoads() {
        String routingKey = "business1";
        String message = routingKey + "-data-" + System.currentTimeMillis();
        businessSendService.send(routingKey, message);
    }
}

按照预期,消息最终会流入死信队列。可以通过 RabbitMQ 的后台管理界面查看具体的效果:


2、消息过期

BusinessRabbitMQConfig中再添加一个business.queue2业务消息队列,设置队列中消息的过期时间为10秒,同样设置好死信队列:

@Bean
Queue businessQueue2() {
    HashMap<String, Object> args = new HashMap<>();
    // 设置队列中消息的过期时间,单位毫秒
    args.put("x-message-ttl", 10000);
    args.put("x-dead-letter-exchange", "dead.letter.exchange");
    args.put("x-dead-letter-routing-key", "dead.letter");
    return new Queue("business.queue2", true, false, false, args);
}

@Bean
Binding businessBinding2() {
    return BindingBuilder.bind(businessQueue2()).to(businessExchange()).with("business2");
}

不用给business.queue2配置消费者,重启项目,直接发送一条消息,让它自动过期即可:

String routingKey = "business2";
String message = routingKey + "-data-" + System.currentTimeMillis();
businessSendService.send(routingKey, message);

等待10秒后,消息会自动流入死信队列:

除了给队列设置消息的超时时间,也可以在发送消息时配置,有兴趣的可以自己尝试:

public void send2(String routingKey, String message) {
    MessagePostProcessor processor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
    rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
    System.out.println("发送的业务消息:" + message);
}

3、消息溢出

由于消息队列满了,导致消息溢出而进入死信队列的场景也比较简单。

BusinessRabbitMQConfig中再添加一个business.queue3业务消息队列,设置队列的大小为10,同样设置好死信队列:

@Bean
Queue businessQueue3() {
    HashMap<String, Object> args = new HashMap<>();
    // 设置消息队列的大小
    args.put("x-max-length", 10);
    args.put("x-dead-letter-exchange", "dead.letter.exchange");
    args.put("x-dead-letter-routing-key", "dead.letter");
    return new Queue("business.queue3", true, false, false, args);
}

@Bean
Binding businessBinding3() {
    return BindingBuilder.bind(businessQueue3()).to(businessExchange()).with("business3");
}

business.queue3也不设置消费者,重启项目,发送15条消息:

for (int i = 0; i < 15; i++) {
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    String routingKey = "business3";
    String message = routingKey + "-data-" + System.currentTimeMillis();
    businessSendService.send(routingKey, message);
}

按照预期business.queue3最终会有10条消息,剩下的5条进入死信队列:

四、小结

关于死信队列的用法就介绍到这里了,还是很简单的。在一些重要的业务场景中,为了防止有些消息由于各种原因未被正常消费而丢失掉,可以考虑使用死信队列来保存这些消息,以方便后期排查问题使用,这样总比后期再去复现错误要简单的多。其实,延时队列也可以结合死信队列来实现,本文消息过期例子就是它的雏形,后边的文章我们再详细探讨。

本文完!

相关文章

  • 【深度知识】RabbitMQ死信队列的原理及GO实现

    1. 摘要 本文按照以下目前讲解RabbitMQ死信队列的内容,包括:(1)死信队列是什么?(2)如何配置死信队列...

  • rabbitmq延迟队列

    一、讲解RabbitMQ的的死信队列+ TTL 二、RabbitMQ的延迟队列和应⽤场景 1、简介 2、业界的⼀些...

  • RabbitMQ之认识死信队列一

    前言 RabbitMQ 有个队列叫死信队列,死信队列可以做蛮多事的,比如可以让消息半个小时后消费,规定每天几点钟消...

  • RabbitMQ消息中间件技术精讲17 高级篇十 死信队列

    死信队列介绍 本文是《RabbitMQ精讲系列》中第十七:RabbitMQ消息中间件技术精讲17 高级篇十 死信队...

  • RabbitMQ死信队列

    死信队列介绍 死信队列:DLX(dead-letter-exchange) 利用DLX,当消息在一个队列中变成死信...

  • RabbitMQ死信队列

    什么是死信队列 当发生以下任何事件,那么消息将成为死信 消费者使用basic.reject或 basic.nack...

  • 死信队列 (rabbitMQ)

    转载:http://www.imooc.com/article/283645 1.什么是死信队列 想必有些小伙伴应...

  • RabbitMQ 死信队列

    死信队列 "死信"模式 指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人...

  • RabbitMQ死信队列

    SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特...

  • rabbitmq 死信队列

    死信队列: DLX,dead-letter-exchange 利用 dlx,当消息在一个队列中变成死信 (dead...

网友评论

      本文标题:RabbitMQ 死信队列

      本文链接:https://www.haomeiwen.com/subject/jkzzjltx.html