美文网首页
2020-07-29--stream消息驱动

2020-07-29--stream消息驱动

作者: 李霖神谷 | 来源:发表于2020-07-29 13:13 被阅读0次

stream是消息驱动框架,类似于jdbc驱动,它整合了kafka、rabitmq。使得开发人员只专注于底层的业务逻辑,不关注用的是什么消息中间件。
1.demo:

pom文件:
 <!--rabbit驱动-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

配置文件:

server:
  port: 8801
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka/
  instance:
    ##标识你的服务名称
    instance-id: spring-cloud-config-consume
    ##标识你的ip地址
    prefer-ip-address: true
spring:
  application:
    name: springcloudstream8801
  cloud:
    stream:
      ##绑定rabbitmq信息
      binders:
        defaultRabbit:
          type: rabbit
          enviroment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      ##服务的整合处理
      bindings:
        output:
          ##使用exchange名称定义
          destination: studyExchange
          content-tpe: application/json
          binder: defaultRabbit

业务逻辑:
消息生产者:
//定义消息的推送通道
@EnableBinding(Source.class)
public class IMessegeOutImpl implements IMessegeOut {
    //        消息发送管道
    @Resource
    private MessageChannel output;
    public String outMessage() {
        output.send(MessageBuilder.withPayload("一剪梅").build());
        return null;
    }
}


消息消费者:
消费者配置文件里的output要改成input
@Component
@EnableBinding(Sink.class)
public class StreamController {
    @StreamListener(Sink.INPUT)
    public  void  input(Message<String> message ){
        System.out.println("我是消息消费者============="+message.getPayload());
    }
}


2.消息重复消费问题、持久化问题:
由于消费者被绑定之后都会生成不同的流水号的组,这里在配置文件中设置group,设置同一个组名。生产的消息会被均分到同组下的所有服务中去,解决了消息城府消费问题。
如果某一个服务待机挂掉同组的服务会消费未被接受的消息。

    bindings:
        input:
          ##使用exchange名称定义
          destination: studyExchange
          content-tpe: application/json
          binder: defaultRabbit
          group: lishuaiA

相关文章

网友评论

      本文标题:2020-07-29--stream消息驱动

      本文链接:https://www.haomeiwen.com/subject/fwchrktx.html