...">
美文网首页工作生活
SpringBoot 整合 ActiveMQ

SpringBoot 整合 ActiveMQ

作者: 大鱼炖海棠 | 来源:发表于2019-06-30 21:56 被阅读0次

一、添加 Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 如果使用线程池,需要引入在 pom 中加入以下依赖 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.9</version>
</dependency>

二、YML文件配置

YML 格式化显示需要安装插件。依次选中"Eclipse菜单 -> Help -> Eclipse MarketPlace",在弹窗中搜索"Spring Tools":


Eclipse MarketPlace

点击右下角的 Install 安装插件。
安装完毕后重启 IDE,右键 YML文件,选择打开方式:


Open With

此时配置文件以添加样式形式展示:


application.yml

接下来进入正题配置 ActiveMQ:

spring:
  application:
    name: common
  activemq:
    broker-url: tcp://192.168.18.150:61616
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 10

server:
  port: 9102
  
mq:
  active:
    count-queue-name: wordsCount
    query-queue-name: queryNumOfWord

从上述配置可知 MQ 的用户名、密码和代理 URL,并启用了线程池,最大连接数设置为10。此外还定义了 2 个队列 wordsCount 和 queryNumOfWord。文本拆分成单词计数的任务会投递到 wordsCount 队列,单词出现次数的查询交易会投递到 queryNumOfWord 队列。

三、编写 ActiveMQManager.java

ActiveMQManager 注入了JmsMessagingTemplate 类,通过调用该类实例的 convertAndSend 方法进行消息的投递。

package com.galaxy.common.mq.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class ActiveMQManager {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
 
    /**
     * @param data
     * @desc 即时发送
     */
    public void sendMsg(String destination,String data) {
        this.jmsMessagingTemplate.convertAndSend(destination, data);
    }
}

四、编写 ActiveMQProducer.java

负责计数任务和查询任务的投递。

    @Value(value = "${mq.active.count-queue-name}")
    private String COUNT_QUEUE_NAME;
    
    @Value(value = "${mq.active.query-queue-name}")
    private String QUERY_QUEUE_NAME;

    @Autowired
    private ActiveMQManager mqManager;

    public String sendText(String text) {
        logger.info("发送的文本内容:{}", text);
        
        try {
            mqManager.sendMsg(COUNT_QUEUE_NAME, text);
        } catch (Exception e){
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        
        return "SUCESS";
    }

    public String queryWordCount(String word) {
        logger.info("查询单词计数:{}", word);
        
        try {
            mqManager.sendMsg(QUERY_QUEUE_NAME, word);
        } catch (Exception e){
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        
        return "SUCESS";
    }

五、编写 ActiveMQConsumer.java
消费者 API 接口通过添加 @JmsListener 注解监听指定的端口,将队列中的消息取出消费。

    @Autowired
    private StringRedisTemplate redisTemplate;

    @JmsListener(destination = "wordsCount")
    public void wordsCount(String text) {
        logger.info("待处理文本内容:{}", text);
        
        text = text.replaceAll( "[\\p{P}+~$`^=|<>~`$^+=|<>¥×]" , " ");
        logger.info("正则过滤标点符号后的文本内容:{}", text);
        
        String[] words = text.split(" ");
        logger.info("切割出的单词数量:{}", words.length);
        
        logger.info("【连接redis服务器进行计数……】");
        for (String word : words) {
            if (StringUtils.isEmpty(word)) {
                continue;
            }

            redisTemplate.opsForValue().increment(word);
        }
        
        logger.info("【redis服务器计数完成】");
    }

    @JmsListener(destination = "queryNumOfWord")
    public void queryNumOfWord(String word) {
        logger.info("目标单词:{}", word);
        
        logger.info("【连接redis服务器查询该单词的计数】");
        String count = redisTemplate.opsForValue().get(word);
        logger.info("{}:{}", word, count);
    }

相关文章

网友评论

    本文标题:SpringBoot 整合 ActiveMQ

    本文链接:https://www.haomeiwen.com/subject/isricctx.html