一 Spring Cloud Stream的理解
1.1 Spring Cloud Stream概念
Spring Cloud Stream提供一种解耦的方式,将各个中间件的实现细节进行整合,对外提供统一的接口。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
- 绑定器(Binder)
在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互,各中间件的实现细节差异较大,不利于扩展升级。通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用的逻辑。
- 发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之 后,它会通过共享的 Topic /Exchange 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
1.2 架构图

二 Spring Cloud Stream 的实现
2.0 安装并启动rabbitMQ
rabbitmq-server
2.1 消息生产者配置
- 引入stream依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- Spring Cloud Stream 以内置binder,无需编写
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
- 编写配置文件
server:
port: 8081
spring:
application:
name: stream-producer
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: stream-default # 指定消息发送的目的地
contentType: text/plain # 指定消息的类型
binders:
defaultRabbit:
type: rabbit
- 编写发送消息的类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class MessageProducer {
@Autowired
@Qualifier("output")
private MessageChannel output;
public void send(Object message){
output.send(MessageBuilder.withPayload(message).build());
}
}
- 编写启动类
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
- 测试发送消息
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class TestProducer {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSend(){
String message = "Hello World";
messageProducer.send(message);
}
}
2.2 消息消费者配置
- 引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- Spring Cloud Stream 以内置binder,无需编写
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
- 编写配置文件
server:
port: 8082
spring:
application:
name: stream-consumer
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: stream-default # 指定消息发送的目的地
binders:
defaultRabbit:
type: rabbit
- 编写接收消息的监听类
@EnableBinding(Sink.class)
public class MessageConsumer {
@StreamListener(Sink.INPUT)
public void messageListener(Message<String> message){
System.out.println("监听的消息:"+message.getPayload());
}
}
- 启动类
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
三 Stream消息分组
在同一组的情况下,只会有其中一个消费者消费消息(经测试各消费者采用的是轮询的方式消费消息)
server:
port: 8082
spring:
application:
name: stream-consumer
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: stream-default
group: group1 # 消息分组
binders:
defaultRabbit:
type: rabbit
四 Stream消息分区
消息分区的应用场景:同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析。
- 消费者的配置
server:
port: 8082
spring:
application:
name: stream-consumer
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
instance-count: 2 # 当前消费者的总实例数
instance-index: 0 # 当前实例的索引号
bindings:
input:
destination: stream-default
group: group1 # 该分组不能去掉
consumer:
partitioned: true # 开启分区支持
binders:
defaultRabbit:
type: rabbit
- 生产者的配置
server:
port: 8081
spring:
application:
name: stream-producer
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: stream-default
content-type: text/plain
producer:
partition-key-expression: payload # 分区键的表达式规则
partition-count: 2 # 消息分区的数量
binders:
defaultRabbit:
type: rabbit
网友评论