一、添加 Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 如果使用线程池,需要引入在 pom 中加入以下依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
二、YML文件配置
YML 格式化显示需要安装插件。依次选中"Eclipse菜单 -> Help -> Eclipse MarketPlace",在弹窗中搜索"Spring Tools":

点击右下角的 Install 安装插件。
安装完毕后重启 IDE,右键 YML文件,选择打开方式:

此时配置文件以添加样式形式展示:

接下来进入正题配置 ActiveMQ:
spring:
application:
name: common
activemq:
broker-url: tcp://192.168.18.150:61616
user: admin
password: admin
pool:
enabled: true
max-connections: 10
server:
port: 9102
mq:
active:
count-queue-name: wordsCount
query-queue-name: queryNumOfWord
从上述配置可知 MQ 的用户名、密码和代理 URL,并启用了线程池,最大连接数设置为10。此外还定义了 2 个队列 wordsCount 和 queryNumOfWord。文本拆分成单词计数的任务会投递到 wordsCount 队列,单词出现次数的查询交易会投递到 queryNumOfWord 队列。
三、编写 ActiveMQManager.java
ActiveMQManager 注入了JmsMessagingTemplate 类,通过调用该类实例的 convertAndSend 方法进行消息的投递。
package com.galaxy.common.mq.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class ActiveMQManager {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* @param data
* @desc 即时发送
*/
public void sendMsg(String destination,String data) {
this.jmsMessagingTemplate.convertAndSend(destination, data);
}
}
四、编写 ActiveMQProducer.java
负责计数任务和查询任务的投递。
@Value(value = "${mq.active.count-queue-name}")
private String COUNT_QUEUE_NAME;
@Value(value = "${mq.active.query-queue-name}")
private String QUERY_QUEUE_NAME;
@Autowired
private ActiveMQManager mqManager;
public String sendText(String text) {
logger.info("发送的文本内容:{}", text);
try {
mqManager.sendMsg(COUNT_QUEUE_NAME, text);
} catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
}
return "SUCESS";
}
public String queryWordCount(String word) {
logger.info("查询单词计数:{}", word);
try {
mqManager.sendMsg(QUERY_QUEUE_NAME, word);
} catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
}
return "SUCESS";
}
五、编写 ActiveMQConsumer.java
消费者 API 接口通过添加 @JmsListener 注解监听指定的端口,将队列中的消息取出消费。
@Autowired
private StringRedisTemplate redisTemplate;
@JmsListener(destination = "wordsCount")
public void wordsCount(String text) {
logger.info("待处理文本内容:{}", text);
text = text.replaceAll( "[\\p{P}+~$`^=|<>~`$^+=|<>¥×]" , " ");
logger.info("正则过滤标点符号后的文本内容:{}", text);
String[] words = text.split(" ");
logger.info("切割出的单词数量:{}", words.length);
logger.info("【连接redis服务器进行计数……】");
for (String word : words) {
if (StringUtils.isEmpty(word)) {
continue;
}
redisTemplate.opsForValue().increment(word);
}
logger.info("【redis服务器计数完成】");
}
@JmsListener(destination = "queryNumOfWord")
public void queryNumOfWord(String word) {
logger.info("目标单词:{}", word);
logger.info("【连接redis服务器查询该单词的计数】");
String count = redisTemplate.opsForValue().get(word);
logger.info("{}:{}", word, count);
}
网友评论