美文网首页
消息队列总线设计及实现(一)

消息队列总线设计及实现(一)

作者: 西城丶 | 来源:发表于2021-05-05 00:03 被阅读0次

背景

在我们的系统中,采用了多种消息队列,有RabbitMQ、RocketMQ、Pulsar。因为各个项目中采用的实现方式都不同,且很多都是同样的代码,现需要整合多种消息队列,需要实现几个点。

  1. 统一化配置文件,防止mq的配置信息一大堆,配置文件过多。
  2. 生产者发送消息需要全局统一,使用一个类就可以实现发送消息。
  3. 消费者可以自己决定消费使用的线程数。
  4. 暂时只需要实现主备两个server地址。

现已在线上稳定运行8个月,单个topic日均流量百万级别,未出现消费和发送异常。

设计

基本思路

  1. 消息队列客户端,在项目初始化的时候根据配置参数进行初始化,客户端支持三种连接,主/备/全部
  2. 消息队列发送方,根据客户端和发送类型发送消息,屏蔽各个子类的细节,提供公共方法发送消息,在使用的时候才进行注册(即调用发送消息方法的时候才进行注册)
  3. 消息队列接收方,根据配置的客户端和接收类型,提供基类,各个子系统自行注册bean

统一化配置

由于各个消息队列需要使用的配置都不一样,所以我们需要一个基类来被子类继承。每个消息队列客户端、生产者和消费者需要的信息也不同,所以配置信息就设计为:

├─ BaseProperties

├── ClientProperties

├── ProducerProperties

├── ConsumerBaseProperties

├─── ConsumerProperties

最高级是BaseProperties,各个消息队列的ClientProperties/ProducerProperties/ComsumerBaseProperties继承BaseProperties,ComsumerProperties继承ComsumerBaseProperties,消费者多了一层继承主要是因为消费者消费的主备参数是多个消费者公用的,所以把这个参数统一抽取出来。

统一发送消息

这里将各自的消息队列发送消息封装为一个门面类,通过门面类去发送消息,门面类调用各自生产者的发送消息方法,外部可以不知道发送消息细节。

消费者设计

我们先来看下消费者需要什么信息,注册阶段,我们需要指定订阅的是什么地址,订阅的topic是什么,以及最主要的监听消息的类是什么。那么消费者可以使用线程处理消息,我们可以在哪个环节处理呢?肯定是监听消息处理消息的类那边对吧,如果这个消费者需要采用线程处理消息,那么在监听器里接收到消息的时候,调用自己构建的线程处理消息。

等等

消费者你说了那么多参数,有点乱了吧。那么,我们就来一次封装,我们把消费者的信息,封装为一个消费者上下文,这个消费者上下文可以贯穿消费者的全局,从注册,到接收消息,到处理消息,都能使用这个上下文获得消费者的信息。

那么这个消费者上下文需要什么信息呢?

  1. 消费者配置参数,这个必不可少,也就是上面说的ComsumerBaseProperties
  2. 线程池信息,既然都使用线程了,何不采用线程池来处理消息
  3. 接收者信息,像rocketMQ,在发送的时候可以传入一个tag信息,接收端需要根据这个tag信息来决定是采用那种策略处理

各种思路想的差不多,下篇开始具体实现~

相关文章

网友评论

      本文标题:消息队列总线设计及实现(一)

      本文链接:https://www.haomeiwen.com/subject/gegcdltx.html