序言
现在我们每天都要与信息打交道,主动或被动的在创造或接收消息。你会收到话费通知短信,使用微信 QQ跟远在万里的朋友交流,也可能使用钉钉跟同事讨论工作,使用抖音娱乐等等。信息要准确及时的发送和接收 这背后使用了 消息队列的相关技术。本文以RabbitMQ为例 讲解消息队列涉及的相关技术及使用场景,结合自身开发经验帮助读者更好理解这个隐藏在背后的这项“黑科技”
概括
常见的消息通讯方式有同步和异步两种,消息队列实现了异步通讯方式,即消息先存储到消息队列容器里 ,满足某种条件后 发送给消息的接收方,当然消息队列也可以实现同步通讯。队列(Queue)是一种数据结构 满足了先进先出的规则。这个消息的传输有生产者 交换机 消息队列 消费者4部分组成。
RabbitMQ是一种易扩展,高可用 实现了AMQP协议并被广泛应用的开源消息队列。

RabbitMQ队列属性简介
AMQP的属性在RabbitMQ体现:
1 持久化 即队列持久化保存到硬盘,如果服务重启 可以再次恢复队列
如在代码里配置两个 Queue
/**
* queue 持久化
* @return
*/
@Bean
public Queue DurableTrueQueue(){
return new Queue("durableQueue",true);
}
/**
* queue 非持久化
* @return
*/
@Bean
public Queue DurableFalseQueue(){
return new Queue("noDurableQueue",false);
}
如注释那样 队列durableQueue 是持久化队列 而 noDurableQueue是非持久化队列
在RabbmitMQ的管理后台可以看到这两个队列信息

重启RabbitMQ服务器 可以看到noDurableQueue 已经没了

2 独有性 排他性 即队列只为当前链接服务 链接断开 队列被删除
我对独有性理解就是 队列只服务于一个链接 链接消失 则队列也被删除
代码里配置排他性的 队列
@Bean
public Queue exclusiveTrueQueue(){
return new Queue("exclusiveTrueQueue",true,true,false);
}
在RabbitMQ后台管理页面可以看到 该队列

点击队列名称 exclusiveTrueQueue 进入队列详情页面
点击get Messages 里的 get Message 按钮 会报错 说该队列具有排他性 不能获取消息内容

3 自动删除 即消费者断开 队列被删除
自动删除 是指如果该队列对应的链接全部断开 则删除该队列 读者可参考自行实验
需要说明的是 如果队列的 exclusive 为true或auto delete为true 那durable属性是不起作用的 因为服务器重启 链接都会断掉 队列信息会被删除
4 其他可选队列参数参数属性(如队列长度 过期时间等)
仲裁队列
在集群环境下,仲裁队列可以提供一种高效的 保证数据安全的 数据传输能力,能保证在某一台服务器不可用情况下 主从服务的快速切换 和数据完整性复制 从而达到RabbitMQ的高可用和高性能,仲裁队列遵循Raft分布式协议。仲裁队列是RabbitMQ3.8.*版本以后加入的队列类型 用以替换之前版本的队列镜像
在RabbitMQ集群架构中 ,会有一个主实例和多个从实例,主实例负责跟发送端 接收端的交互,把接收到的数据 往从实例复制一份,即从实例是主实例的拷贝和备份,当主实例节点宕机或不可用时候 从实例中会选出一个新的主实例 进行消息的接收和发送 从而保证队列的可用性。
为保证主从节点数据一致性 ,只有当主节点的数据全部写入从节点时候,主节点才会跟发送方确认消息已接收。数据完整保存或最接近完整数据的从节点才有可能被选举为主节点 ,数据残缺不全或者数据要比其他节点少的从节点是不会被选为主节点的,即从节点的选择要最大可能保证数据的完整性。
仲裁队列创建:
@Bean
Queue quorumQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-queue-type","quorum");
return new Queue("quorumQueue",true,false,false,map);
}
在RabbitMQ管理后台可以看到已经创建成功:

消息过期时间(Time-To-Live)
RabbitMQ 队列里的消息如果超过了过期时间没有消费者接收就变成了死信。处于消息队列中的死信不能发送给客户端,消息服务器会在过期后将消息删除。
RabbitMQ 的过期时间(以下简称TTL)有两种类型设置:
1 设置单条消息的过期时间
2 以队列为单位设置消息的过期时间(代码,命令行两种方式)
两种设置时间的单位都是毫秒
以队列为单位设置消息的过期时间 需要在声明队列时候 设置消息过期参数
@Bean
public Queue ttlQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-message-ttl",60000);
return new Queue("ttlQueue",true,false,false,map);
}
如果x-message-ttl 设置为0,则该队列的消息需要同步接收 不能在队列里保存 否则会过期
设置成功后 在服务器管理页面可以看到TTL标识

设置单条消息过期时间

需要注意的是 MessageProperties 的Expiration类型为String格式
队列过期时间
队列过期时间指超过一定时间 队列没有被消费者消费 也没有被重新声明续租 消息队列节点会自动删除该队列
队列过期可以使用在 不断创建新队列 老的队列在没有一定时间没有使用后会自动删除释放资源,如 聊天场景消息的发送接收 RPC 通过不断创建队列传输消息 当聊天结束后 自动删除队列
如果对持久化队列设置了过期时间 当服务器重启后 过期时间会重新开始计算
设置方式是 在创建声明队列时候 传入参数x-expires
@Bean
public Queue ttlForQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-expires",60000);
return new Queue("ttlForQueue",true,false,false,map);
}

队列长度限制
队列的最大长度既可以限制队列里处理消息的数量 也可以限制处理消息总的字节大小。
这里的长度限制是指被消费者处理过的消息累积,不包括没有被消费者端接收的在途消息
当消息达到队列长度限制的时候,系统默认会将队列头部(最老的消息)删除或设置为死信。
可以设置消息超限的应对策略 参数为x-overflow 值对应为
1 drop-head(默认方式 删除队列最早消息)
2 reject-publish(拒绝新的消息加入)
@RequestMapping(value = "/limitQueue",method = RequestMethod.GET)
public void sendMessageForLimitQueue(){
Map<String,Object> args = new HashMap<>();
args.put("x-overflow","drop-head");
args.put("x-max-length",5);
//声明数量受限的队列
Queue queue = new Queue("limitQueue",true,false,false,args);
rabbitAdmin.declareQueue(queue);
//绑定到交换机
Binding binding = new Binding("limitQueue",Binding.DestinationType.QUEUE,"testExchange","limitQueueRoutkey",null);
rabbitAdmin.declareBinding(binding);
String content = "hello this is a test message ";
Map<String,Object> map = makeMessage();
map.put("content",content);
System.out.println("开始发送。。"+map.toString());
rabbitTemplate.convertAndSend("testExchange","limitQueueRoutkey",map);
}
网友评论