From 8dc91c33e5d27ac5d19529c0f333512d348704f4 Mon Sep 17 00:00:00 2001 From: zhangdaiscott Date: Thu, 22 Sep 2022 15:50:56 +0800 Subject: [PATCH] =?UTF-8?q?websocket=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/websocket/SocketHandler.java | 11 +- .../modules/message/websocket/WebSocket.java | 168 ++++++++++-------- 2 files changed, 96 insertions(+), 83 deletions(-) diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/SocketHandler.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/SocketHandler.java index 53eb9dc1..4d079718 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/SocketHandler.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/SocketHandler.java @@ -9,11 +9,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * 监听消息(采用redis发布订阅方式发送消息) + * 监听消息(通过redis发布订阅,推送消息) + * 此方案:解决集群部署的问题,多实例节点(也就是发送消息端先发送消息到redis中,每个服务节点收到redis消息,再触发具体的ws推送) * @author: jeecg-boot */ @Slf4j -@Component +@Component(WebSocket.REDIS_TOPIC_NAME) public class SocketHandler implements JeecgRedisListener { @Autowired @@ -21,15 +22,17 @@ public class SocketHandler implements JeecgRedisListener { @Override public void onMessage(BaseMap map) { - log.info("【SocketHandler消息】Redis Listerer:" + map.toString()); + log.info("【Redis发布订阅模式】redis Listener: {},参数:{}",WebSocket.REDIS_TOPIC_NAME, map.toString()); String userId = map.get("userId"); String message = map.get("message"); if (ObjectUtil.isNotEmpty(userId)) { + //pc端消息推送具体人 webSocket.pushMessage(userId, message); - //app端消息推送 + //app端消息推送具体人 webSocket.pushMessage(userId+CommonSendStatus.APP_SESSION_SUFFIX, message); } else { + //推送全部 webSocket.pushMessage(message); } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocket.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocket.java index f5c2de5b..c45d9fac 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocket.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocket.java @@ -1,25 +1,17 @@ package org.jeecg.modules.message.websocket; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; - import javax.annotation.Resource; -import javax.websocket.OnClose; -import javax.websocket.OnMessage; -import javax.websocket.OnOpen; -import javax.websocket.Session; +import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; +import com.alibaba.fastjson.JSONObject; import org.jeecg.common.base.BaseMap; import org.jeecg.common.constant.WebsocketConst; import org.jeecg.common.modules.redis.client.JeecgRedisClient; import org.springframework.stereotype.Component; - -import com.alibaba.fastjson.JSONObject; - import lombok.extern.slf4j.Slf4j; /** @@ -31,109 +23,126 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @ServerEndpoint("/websocket/{userId}") public class WebSocket { - - private Session session; + + /**线程安全Map*/ + private static ConcurrentHashMap sessionPool = new ConcurrentHashMap<>(); /** - * 用户ID + * Redis触发监听名字 */ - private String userId; - - private static final String REDIS_TOPIC_NAME = "socketHandler"; - + public static final String REDIS_TOPIC_NAME = "socketHandler"; @Resource private JeecgRedisClient jeecgRedisClient; - /** - * 缓存 webSocket连接到单机服务class中(整体方案支持集群) - */ - private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); - /** - * 线程安全Map - */ - private static ConcurrentHashMap sessionPool = new ConcurrentHashMap<>(); + //==========【websocket接受、推送消息等方法 —— 具体服务节点推送ws消息】======================================================================================== @OnOpen public void onOpen(Session session, @PathParam(value = "userId") String userId) { try { - //TODO 通过header中获取token,进行check - this.session = session; - this.userId = userId; - webSockets.add(this); sessionPool.put(userId, session); - log.info("【websocket消息】有新的连接,总数为:" + webSockets.size()); + log.info("【系统 WebSocket】有新的连接,总数为:" + sessionPool.size()); } catch (Exception e) { } } @OnClose - public void onClose() { + public void onClose(@PathParam("userId") String userId) { try { - webSockets.remove(this); - sessionPool.remove(this.userId); - log.info("【websocket消息】连接断开,总数为:" + webSockets.size()); - } catch (Exception e) { - } - } - - - /** - * 服务端推送消息 - * - * @param userId - * @param message - */ - public void pushMessage(String userId, String message) { - Session session = sessionPool.get(userId); - if (session != null && session.isOpen()) { - try { - //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU - synchronized (session){ - log.info("【websocket消息】 单点消息:" + message); - session.getBasicRemote().sendText(message); - } - //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - /** - * 服务器端推送消息 - */ - public void pushMessage(String message) { - try { - webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message)); + sessionPool.remove(userId); + log.info("【系统 WebSocket】连接断开,总数为:" + sessionPool.size()); } catch (Exception e) { e.printStackTrace(); } } + /** + * ws推送消息 + * + * @param userId + * @param message + */ + public void pushMessage(String userId, String message) { + for (Map.Entry item : sessionPool.entrySet()) { + //userId key值= {用户id + "_"+ 登录token的md5串} + //TODO vue2未改key新规则,暂时不影响逻辑 + if (item.getKey().contains(userId)) { + Session session = item.getValue(); + try { + //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU + synchronized (session){ + log.info("【系统 WebSocket】推送单人消息:" + message); + session.getBasicRemote().sendText(message); + } + //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU + } catch (Exception e) { + log.error(e.getMessage(),e); + } + } + } + } + /** + * ws遍历群发消息 + */ + public void pushMessage(String message) { + try { + for (Map.Entry item : sessionPool.entrySet()) { + try { + item.getValue().getAsyncRemote().sendText(message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + log.info("【系统 WebSocket】群发消息:" + message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + + /** + * ws接受客户端消息 + */ @OnMessage - public void onMessage(String message) { - //todo 现在有个定时任务刷,应该去掉 - log.debug("【websocket消息】收到客户端消息:" + message); + public void onMessage(String message, @PathParam(value = "userId") String userId) { + if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){ + log.info("【系统 WebSocket】收到客户端消息:" + message); + }else{ + log.debug("【系统 WebSocket】收到客户端消息:" + message); + } + + //------------------------------------------------------------------------------ JSONObject obj = new JSONObject(); //业务类型 obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK); //消息内容 obj.put(WebsocketConst.MSG_TXT, "心跳响应"); - //update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473 - for (WebSocket webSocket : webSockets) { - webSocket.pushMessage(obj.toJSONString()); - } - //update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473 + this.pushMessage(userId, obj.toJSONString()); + //------------------------------------------------------------------------------ } + /** + * 配置错误信息处理 + * + * @param session + * @param t + */ + @OnError + public void onError(Session session, Throwable t) { + log.warn("【系统 WebSocket】消息出现错误"); + //t.printStackTrace(); + } + //==========【系统 WebSocket接受、推送消息等方法 —— 具体服务节点推送ws消息】======================================================================================== + + + //==========【采用redis发布订阅模式——推送消息】======================================================================================== /** * 后台发送消息到redis * * @param message */ public void sendMessage(String message) { - log.info("【websocket消息】广播消息:" + message); + //log.info("【系统 WebSocket】广播消息:" + message); BaseMap baseMap = new BaseMap(); baseMap.put("userId", ""); baseMap.put("message", message); @@ -141,7 +150,7 @@ public class WebSocket { } /** - * 此为单点消息 + * 此为单点消息 redis * * @param userId * @param message @@ -154,7 +163,7 @@ public class WebSocket { } /** - * 此为单点消息(多人) + * 此为单点消息(多人) redis * * @param userIds * @param message @@ -164,5 +173,6 @@ public class WebSocket { sendMessage(userId, message); } } - + //=======【采用redis发布订阅模式——推送消息】========================================================================================== + } \ No newline at end of file