美文网首页
rabbitmq消息确认机制

rabbitmq消息确认机制

作者: 归来依旧少女 | 来源:发表于2019-06-16 21:08 被阅读0次

一、事务

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。txSelect()开启事务,生产者发送消息内容给mq,这是一阶段提交。然后本地可以继续处理自己的业务逻辑,处理完提交事务,就发送提交事务的消息给mq,mq就可以直接后续处理了。如果本地处理有问题,回滚本地业务,发送一个回滚事务的消息给mq,mq就知道这条消息作废了,进行回滚,不进行后续的操作了。
但事务机制是一个同步的过程,效率相对较低,如果对数据一致性要求很高的话可以使用事务机制。

二、ack消息确认

rabbitmq的confirm模式是异步的,所以相对效率会高很多。

1.rabiitmq消息确认分为两种:

1.发送消息的确认。分为消息发送到交换机的确认、消息发送到队列的确认

2.接收消息的确认。

2.springboot集成rabiitmq的确认模式:

acknowledgeMode有三值:

A、NONE = no acks will be sent (incompatible with channelTransacted=true).

      RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.

B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().

C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.

简单来说也就是:

none:不确认,不会发送任何ack

manual:手动确认,发送端和客户端都需要手动确认

auto:自动确认,就是自动发ack,除非抛异常。

3.代码

配置:

@Configuration
public class MqConsumerConfig {

    public final static String QUEUE_ACK_NAME = "orderme-queue.yannic.ack";

    public static final String ORDER_WEBSOCKET_EXCHANGE = "orderme.yannic.websocket";
    @Bean(name="orderTopicAckQueue")
    public Queue orderTopicAckQueue() {
        return new Queue(QUEUE_ACK_NAME);
    }

    @Bean(name = "orderWebSocketExchange")
    public TopicExchange orderWebSocketExchange() {
        return new TopicExchange(ORDER_WEBSOCKET_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeAckMessage(@Qualifier("orderTopicAckQueue") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("yannic.*");
    }

    /**
     * 定制化amqp模版
        * connectionFactory:包含了yml文件配置参数
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调
        // 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
        rabbitTemplate.setMandatory(true);
        // 设置 ConfirmCallback 回调   yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            // 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
            // 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)
            if (ack) {
                String messageId = correlationData.getId();
                System.out.println("confirm:"+messageId);
            }
        });
        // 设置 ReturnCallback 回调   yml需要配置 publisher-returns: true
        // 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                                          exchange, routingKey) -> {
            String messageId = message.getMessageProperties().getMessageId();
            System.out.println("return:"+messageId);
        });
        return rabbitTemplate;
    }

}

发送端:

    /**
     * 发送信息确认ack
     * @param exchange
     * @param routingKey
     * @param object
     */
    public void sendMessageAck(String exchange, String routingKey, Object object) {
        logger.info("mq消息发送开始===》");
        try {
            //CorrelationData用于confirm机制里的回调确认
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchange,routingKey,JSON.toJSONString(object),correlationData);
            logger.info("mq消息发送结束==》{}", object);
        } catch (Exception e) {
            logger.error(String.format("mq 发送 %s 的数据  %s 异常", exchange, object), e);
        } finally {

        }
    }

消费端:

    /**
     * 手动确认ack
     * @param msg
     */
    @RabbitListener(queues = MqConsumerConfig.QUEUE_ACK_NAME)
    public void consumeTopicAckMessage(Message msg, Channel channel) {
        logger.info("接收的消息为:{}",msg.getBody());
        try {
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("接收mq消息失败:{}",msg);
        }
    }

4.深入思考

生产者发送消息给mq,遇到网络抖动或者mq这时候宕机了,没有收到mq的ack怎么办?
方案一:就是事务控制咯。这个就是效率慢,rabiitmq的事务与confirm不能同时使用.
方案二:生产者这边业务控制。比如生产者每次发消息之前先把消息保存到本地,如果收到ack就把这个消息给删除,没有收到就隔一段时间重试,最多重试个3次,还是没收到就把这个消息登记起来后续处理,不再发送了。

相关文章

网友评论

      本文标题:rabbitmq消息确认机制

      本文链接:https://www.haomeiwen.com/subject/yeiwfctx.html