RabbitMQ简单使用:
参考:https://www.jianshu.com/p/dca01aad6bc8
(springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费))
对于RabbitMQ进行对象传输的情景,添加如下的配置,注意实体类实现序列化接口
发送端:
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
接收端:
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
这里的简单demo由消息发送端和消息消费端构成,分别对应一个工程,实现消息确认和对象发送 接收。
消息确认消费总体来说,就是发送端确认两点:1.成功发送到ExChange 2.成功由Exchange发送到Queue。消费端确认一点:在执行完所有的逻辑后,通过channel.basicAck
确认已经消费

application.properties
server.port=9000
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#Whether to enable publisher confirms
spring.rabbitmq.publisher-confirms=true
#Whether to enable publisher returns
spring.rabbitmq.publisher-returns=true
package com.myrabbit.simplesender;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author huangQiChang
* 对RabbitTemplate进行定制,设置jsckson 成功发送到exChange的回调函数 exChange发送到queue失败的回调函数
*/
@Configuration
public class RabbitMQConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println(correlationData);
if (ack) {
System.out.println("收到ack");
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println(String.format("===%s===", "returnCallBack"));
System.out.println("message = " + message);
System.out.println("replyCode = " + replyCode);
System.out.println("replyText = " + replyText);
System.out.println("exchange = " + exchange);
System.out.println("routingKey = " + routingKey);
});
return rabbitTemplate;
}
}
package com.myrabbit.simplesender;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author huangQiChang
* 设置交换机 消息队列 以及相关的绑定信息
*/
@Configuration
public class RabbitMQComponentConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("myEx");
}
@Bean
public Queue queue() {
return new Queue("myQ.test");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("myQ.*");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("myDire",false,true);
}
@Bean
public Queue queue_not_in() {
return new Queue("myDire.notIn");
}
@Bean
public Binding directExchange_queue_not_in_binding() {
return BindingBuilder.bind(queue_not_in()).to(directExchange()).with("myDire.notIn");
}
}
package com.myrabbit.simplesender.sender;
import com.myrabbit.simplesender.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author huangQiChang
* @date 2019/10/8
*/
@Component
public class QueueSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 测试topic模式
public void send() {
rabbitTemplate.convertAndSend("myEx", "myQ.t", "this is a test msg");
}
// 测试direct模式,没queue匹配的情况
public void sendDirect() {
rabbitTemplate.convertAndSend("myEx", "qwe", "direct msg test");
}
public void sendObj() {
User user = new User();
user.setUserName("huangSir");
user.setDesc("user's desc");
rabbitTemplate.convertAndSend("myEx","myQ.test",user);
}
}
消息消费端
application.properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
}
package com.mpt.simplelistener.listener;
import com.mpt.simplelistener.pojo.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author huangQiChang
* @date 2019/10/8
*/
@Component
public class MyListener {
@RabbitListener(queues = "myQ.test")
public void consume(Message msg, Channel channel, User user) throws IOException {
System.out.println("user = " + user);
MessageProperties properties = msg.getMessageProperties();
// 发布的每一条消息都会获得一个唯一的deliveryTag,
// (任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1),
// deliveryTag在channel范围内是唯一的
long tag = properties.getDeliveryTag();
System.out.println("tag = " + tag);
// 这里multiple参数的含义是
// true:将小于该tag值的消息确认
// falase:只确认当前消息
channel.basicAck(tag, false);
// 出错了,通知MQ把消息塞回的队列头部(不是尾部)
// channel.basicNack(tag,false,true);
}
}
网友评论