持久化
持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。
1.交换器的持久化
交换器的持久化是在声明交换器的时候,将durable设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中。对于长期使用的交换器来说,建议将其置为持久化。
原生Api
/**
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3:是否持久化
*/
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT,true);
SpringBoot
@Bean
public TopicExchange payTopicExchange(){
/**
* 参数1:交换机类型
* 参数2:是否持久化 true 是
* 参数3:是否自动删除 true 是
*/
return new TopicExchange(exchangeMame,true,false);
}
2.队列对持久化
队列的持久化在声明队列的时候,将durable参数设置为true。如果队列不设置持久化,那么RabbitMQ服务重启之后,相关队列的元数据会丢失,同时队列中的消息也会丢失。
原生Api
/**
* 参数1:String queue 队列名称 如果队列不存在会自动创建
* 参数2:boolean durable 队列是否持久化 true 持久化 false 不持久化
* 参数3:boolean exclusive 是否独占队列 true 独占队列 false 不独占
* 参数4:boolean autoDelete 是否在消费完成后自动删除 true 自动删除
* 参数5:Map<String, Object> arguments 额外附加参数
*/
channel.queueDeclare("hello-1",true,false,false,null);
SpringBoot
@Bean
public Queue dlQueue(){
/**
* 参数1:队列名称
* 参数2:是否持久化
*/
return new Queue(dlQueue,true);
}
3.消息的持久化
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。
在发送消息的时候,通过将BasicProperties中的属性deliveryMode(投递模式)设置为2
即可实现消息的持久化。
原生Api
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build(),
"ddf".getBytes());
SpringBoot
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
//设置信息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend("exchangeName","routingKey","消息内容",messagePostProcessor);
可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ的性能。写入 磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久 化处理,以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做权衡。
将交换器、队列和消息都进行持久化操作后,就能高枕无忧了么?答案是否
定的。
1、对于消费者来说,如果在订阅消息的时候,将autoAck设置为true,那么消费者接收到相关消息后,还没有处理,就出现了异常挂掉了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为 false ,并进行手动确认。
原生aip
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
* 参数3:消费者的回调函数
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费-2:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
SpringBoot
yml:
spring:
rabbitmq:
listener:
simple:
# 手动确认
acknowledge-mode: manual
java:
channel.basicAck(tag, false);
2、在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ并不会为每条消息都进行同步存盘的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。这里可以引入 RabbitMQ 镜像队列机制,相当于配置了副本,如果主节点( master )在此特殊时间内挂掉,可以自动切换到从节点( lave ),
这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证 RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
以上知识均来自于朱忠华大佬
《RabbitMQ实战指南》一书

网友评论