---使用
package com.gxhj.safecampus.configuration.datapush;
import com.gxhj.safecampus.configuration.sysconfig.CampusConfig;
import com.gxhj.safecampus.configuration.websocket.WebSocketServer;
import com.gxhj.safecampus.utils.common.HttpConnectionUtil;
import com.gxhj.safecampus.warning.vo.WarningVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
-
信息推送对象
*/
@Component
public class DataPushBean {@Autowired
private WebSocketServer webSocketServer;/**
- 通过websocket推送黑名单预警信息至首页
- @param groupId 组织ID
- @param warningVo 预警信息
*/
public void sendWarningPersonByWebSocket(String groupId, WarningVo warningVo) {
webSocketServer.sendInfo(groupId, warningVo);
}
/**
- 将提醒信息推送至微信端
- @param message 提醒信息
- @return {} 返回请求结果
*/
public String sendMessageByWeChat(String message) {
return HttpConnectionUtil.doPost(CampusConfig.getHoverParamConfig().getNoteInformUrl(), message);
}
}
---Java配置类
package com.gxhj.safecampus.configuration.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
-
socket配置类,往 spring 容器中注入ServerEndpointExporter实例
*/
@Configuration
public class WebSocketConfig {@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
---Java
package com.gxhj.safecampus.configuration.websocket;
import com.gxhj.safecampus.middleware.util.MiddlewareUtil;
import com.gxhj.core.exception.BusinessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
-
WebSocket服务端代码,包含接收消息,推送消息等接口
*/
@Component
@ServerEndpoint(value = "/socket/{groupId}") // 只是用来标记这个类是websocket服务类
public class WebSocketServer {private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
//用来存放每个客户端对应的WebSocketServer对象。
private static Map<String, Session> sessionPools = new ConcurrentHashMap<>();/**
- 发送消息方法
- @param session 客户端与socket建立的会话
- @param message 消息
- @throws IOException
*/
public void sendMessage(Session session, String message) throws IOException {
if (session != null) {
session.getBasicRemote().sendText(message);
}
}
/**
* 连接建立成功调用
*
* @param session 客户端与socket建立的会话
* @param groupId 客户端的groupId
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "groupId") String groupId) {
sessionPools.put(groupId, session);
}
/**
* 关闭连接时调用
*
* @param groupId 关闭连接的客户端的组织id
*/
@OnClose
public void onClose(@PathParam(value = "groupId") String groupId) {
sessionPools.remove(groupId);
}
/**
* 收到客户端消息时触发---可用于心跳检测
*
* @param message
* @throws IOException
*/
@OnMessage
public void onMessage(Session session, String message) {
try {
sendMessage(session, message);
} catch (Exception e) {
log.error("websocket发送信息失败--发送心跳检测信息失败", e);
}
}
/**
* 发生错误时候
*
* @param session
* @param throwable
*/
@OnError
public void onError(Session session, Throwable throwable) {
System.out.println("发生错误");
log.error("websocket发生错误", throwable);
}
/**
* 给指定用户发送消息
*
* @param groupId 组织id
* @param message 消息
* @throws IOException
*/
public void sendInfo(String groupId, Object message) {
//遍历所有websocket登录用户
for (String key : sessionPools.keySet()) {
//给所有该组织的用户发送消息
if (key.startsWith(groupId)) {
try {
sendMessage(sessionPools.get(key), MiddlewareUtil.getSerialObjMapper().writeValueAsString(message));
} catch (Exception e) {
throw new BusinessException("websocket消息推送异常", e);
}
}
}
}
}
网友评论