美文网首页
rabbitMQ基本(消息确认 对象传输)

rabbitMQ基本(消息确认 对象传输)

作者: dwwl | 来源:发表于2019-10-08 17:06 被阅读0次

RabbitMQ简单使用:

参考:https://www.jianshu.com/p/dca01aad6bc8
(springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费))

对于RabbitMQ进行对象传输的情景,添加如下的配置,注意实体类实现序列化接口

发送端:

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

接收端:

@Bean
public MessageConverter messageConverter() {
    return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}

这里的简单demo由消息发送端和消息消费端构成,分别对应一个工程,实现消息确认和对象发送 接收。

消息确认消费总体来说,就是发送端确认两点:1.成功发送到ExChange 2.成功由Exchange发送到Queue。消费端确认一点:在执行完所有的逻辑后,通过channel.basicAck确认已经消费

1570522744531.png

application.properties

server.port=9000
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#Whether to enable publisher confirms
spring.rabbitmq.publisher-confirms=true
#Whether to enable publisher returns
spring.rabbitmq.publisher-returns=true
package com.myrabbit.simplesender;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author huangQiChang
 * 对RabbitTemplate进行定制,设置jsckson 成功发送到exChange的回调函数 exChange发送到queue失败的回调函数
 */
@Configuration
public class RabbitMQConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

// 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println(correlationData);
            if (ack) {
                System.out.println("收到ack");
            }
        });
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);

// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println(String.format("===%s===", "returnCallBack"));
            System.out.println("message = " + message);
            System.out.println("replyCode = " + replyCode);
            System.out.println("replyText = " + replyText);
            System.out.println("exchange = " + exchange);
            System.out.println("routingKey = " + routingKey);
        });
        return rabbitTemplate;
    }
}

package com.myrabbit.simplesender;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author huangQiChang
 * 设置交换机 消息队列 以及相关的绑定信息
 */
@Configuration
public class RabbitMQComponentConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("myEx");
    }

    @Bean
    public Queue queue() {
        return new Queue("myQ.test");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with("myQ.*");
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("myDire",false,true);
    }

    @Bean
    public Queue queue_not_in() {
        return new Queue("myDire.notIn");
    }

    @Bean
    public Binding directExchange_queue_not_in_binding() {
        return BindingBuilder.bind(queue_not_in()).to(directExchange()).with("myDire.notIn");
    }
}

package com.myrabbit.simplesender.sender;

import com.myrabbit.simplesender.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author huangQiChang
 * @date 2019/10/8
 */
@Component
public class QueueSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    测试topic模式
    public void send() {
        rabbitTemplate.convertAndSend("myEx", "myQ.t", "this is a test msg");
    }
//    测试direct模式,没queue匹配的情况
    public void sendDirect() {
        rabbitTemplate.convertAndSend("myEx", "qwe", "direct msg test");
    }

    public void sendObj() {
        User user = new User();
        user.setUserName("huangSir");
        user.setDesc("user's desc");
        rabbitTemplate.convertAndSend("myEx","myQ.test",user);
    }
}

消息消费端

application.properties

spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
    }
}

package com.mpt.simplelistener.listener;

import com.mpt.simplelistener.pojo.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author huangQiChang
 * @date 2019/10/8
 */
@Component
public class MyListener {

    @RabbitListener(queues = "myQ.test")
    public void consume(Message msg, Channel channel, User user) throws IOException {
        System.out.println("user = " + user);

        MessageProperties properties = msg.getMessageProperties();
//        发布的每一条消息都会获得一个唯一的deliveryTag,
//        (任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1),
//        deliveryTag在channel范围内是唯一的
        long tag = properties.getDeliveryTag();
        System.out.println("tag = " + tag);
//        这里multiple参数的含义是
//        true:将小于该tag值的消息确认
//        falase:只确认当前消息
        channel.basicAck(tag, false);
//        出错了,通知MQ把消息塞回的队列头部(不是尾部)
//        channel.basicNack(tag,false,true);
    }
}

相关文章

网友评论

      本文标题:rabbitMQ基本(消息确认 对象传输)

      本文链接:https://www.haomeiwen.com/subject/tdtipctx.html