From 2b715d90fe2d6e460d1fce4167e55703740c7264 Mon Sep 17 00:00:00 2001 From: rays <1615175118@qq.com> Date: Mon, 7 Jun 2021 15:09:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4socket=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/enums/ClientMessageTypeEnum.java | 5 -- .../WebSocketApplicationRunnerImpl.java | 19 ----- .../websocket/operator/WebSocketOperator.java | 18 ++--- .../websocket/session/SessionCenter.java | 80 ------------------- 4 files changed, 6 insertions(+), 116 deletions(-) diff --git a/kernel-d-socket/socket-api/src/main/java/cn/stylefeng/roses/kernel/socket/api/enums/ClientMessageTypeEnum.java b/kernel-d-socket/socket-api/src/main/java/cn/stylefeng/roses/kernel/socket/api/enums/ClientMessageTypeEnum.java index 15732feb3..896445284 100644 --- a/kernel-d-socket/socket-api/src/main/java/cn/stylefeng/roses/kernel/socket/api/enums/ClientMessageTypeEnum.java +++ b/kernel-d-socket/socket-api/src/main/java/cn/stylefeng/roses/kernel/socket/api/enums/ClientMessageTypeEnum.java @@ -13,11 +13,6 @@ import lombok.Getter; @Getter public enum ClientMessageTypeEnum { - /** - * 添加用户监听的消息类型 - */ - USER_ADD_MSG_TYPE("200001", "用户添加一个监听的消息类型"), - /** * 用户心跳消息类型 */ diff --git a/kernel-d-socket/socket-business-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/business/websocket/spring/WebSocketApplicationRunnerImpl.java b/kernel-d-socket/socket-business-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/business/websocket/spring/WebSocketApplicationRunnerImpl.java index 5a34e1c99..cf542c83c 100644 --- a/kernel-d-socket/socket-business-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/business/websocket/spring/WebSocketApplicationRunnerImpl.java +++ b/kernel-d-socket/socket-business-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/business/websocket/spring/WebSocketApplicationRunnerImpl.java @@ -1,21 +1,14 @@ package cn.stylefeng.roses.kernel.socket.business.websocket.spring; -import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi; import cn.stylefeng.roses.kernel.socket.api.expander.SocketConfigExpander; -import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO; import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer; -import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter; import com.gettyio.core.channel.config.ServerConfig; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; - import java.net.StandardSocketOptions; -import static cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum.*; - /** * Spring Boot启动完成拉起WebSocket * @@ -26,9 +19,6 @@ import static cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum.* @Slf4j public class WebSocketApplicationRunnerImpl implements ApplicationRunner { - @Autowired - private SocketOperatorApi socketOperatorApi; - @Override public void run(ApplicationArguments args) { // 初始化配置对象 @@ -51,14 +41,5 @@ public class WebSocketApplicationRunnerImpl implements ApplicationRunner { WebSocketServer.run(aioServerConfig); log.info("WebSocket Server Start Success!"); - - // 添加用户新增消息类型的回调 - socketOperatorApi.msgTypeCallback(USER_ADD_MSG_TYPE.getCode(), (msgType, msg, socketSession) -> { - // 转换对象 - WebSocketMessagePOJO webSocketMessage = (WebSocketMessagePOJO)msg; - - // 维护会话中心的消息类型 - SessionCenter.addSocketSessionMsgType(webSocketMessage.getData().toString(), webSocketMessage.getFormUserId()); - }); } } diff --git a/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/operator/WebSocketOperator.java b/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/operator/WebSocketOperator.java index b59702939..e007318fb 100644 --- a/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/operator/WebSocketOperator.java +++ b/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/operator/WebSocketOperator.java @@ -42,18 +42,12 @@ public class WebSocketOperator implements SocketOperatorApi { @Override public void sendMsgOfAllUserSession(String msgType, Object msg) { - // 获取监听该消息类型的所有会话 - List> socketSessionList = SessionCenter.getSocketSessionByMsgType(msgType); - - if (ObjectUtil.isNotEmpty(socketSessionList)) { - // 给所有会话发送消息 - for (SocketSession socketSession : socketSessionList) { - WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO(); - webSocketMessagePOJO.setData(msg); - webSocketMessagePOJO.setServerMsgType(msgType); - // 发送内容 - socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO); - } + for (SocketSession socketSession : SessionCenter.getSocketSessionMap().values()) { + WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO(); + webSocketMessagePOJO.setData(msg); + webSocketMessagePOJO.setServerMsgType(msgType); + // 发送内容 + socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO); } } diff --git a/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/session/SessionCenter.java b/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/session/SessionCenter.java index 812322b8d..b8be35edf 100644 --- a/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/session/SessionCenter.java +++ b/kernel-d-socket/socket-sdk-websocket/src/main/java/cn/stylefeng/roses/kernel/socket/websocket/session/SessionCenter.java @@ -1,10 +1,8 @@ package cn.stylefeng.roses.kernel.socket.websocket.session; -import cn.hutool.core.util.ObjectUtil; import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession; import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator; -import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -23,11 +21,6 @@ public class SessionCenter { */ private static ConcurrentMap> socketSessionMap = new ConcurrentHashMap<>(); - /** - * 消息类型和用户ID关系维护 - */ - private static ConcurrentMap> messageTypeSessionMap = new ConcurrentHashMap<>(); - /** * 获取维护的所有会话 * @@ -39,17 +32,6 @@ public class SessionCenter { return socketSessionMap; } - /** - * 获取消息和用户ID的完整映射关系 - * - * @return {@link ConcurrentMap< String, Set>} - * @author majianguo - * @date 2021/6/1 下午2:14 - **/ - public static ConcurrentMap> getMessageTypeSessionMap() { - return messageTypeSessionMap; - } - /** * 根据用户ID获取会话详情 * @@ -70,66 +52,7 @@ public class SessionCenter { * @date 2021/6/1 下午1:49 **/ public static void addSocketSession(SocketSession socketSession) { - - // 维护会话 socketSessionMap.put(socketSession.getUserId(), socketSession); - - // 维护会话所有的消息类型和会话的关系 - if (ObjectUtil.isNotEmpty(socketSession.getMessageTypes())) { - for (String messageType : socketSession.getMessageTypes()) { - Set userIds = messageTypeSessionMap.get(messageType); - if (ObjectUtil.isEmpty(userIds)) { - userIds = new HashSet<>(); - messageTypeSessionMap.put(messageType, userIds); - } - userIds.add(socketSession.getUserId()); - } - } - } - - /** - * 根据消息类型获取所有的会话 - * - * @return {@link List< SocketSession>} - * @author majianguo - * @date 2021/6/1 下午2:06 - **/ - public static List> getSocketSessionByMsgType(String msgType) { - List> res = new ArrayList<>(); - - // 获取监听该消息所有的会话 - Set userIds = messageTypeSessionMap.get(msgType); - if (ObjectUtil.isNotEmpty(userIds)) { - for (String userId : userIds) { - SocketSession socketSession = socketSessionMap.get(userId); - res.add(socketSession); - } - } - - return res; - } - - /** - * 给会话添加监听的消息类型 - * - * @param msgType 消息类型 - * @param userId 用户ID - * @author majianguo - * @date 2021/6/1 下午2:11 - **/ - public static void addSocketSessionMsgType(String msgType, String userId) { - // 维护Session信息 - SocketSession socketSession = socketSessionMap.get(userId); - if (ObjectUtil.isNotEmpty(socketSession)) { - socketSession.getMessageTypes().add(msgType); - } - // 维护消息列表 - Set userIds = messageTypeSessionMap.get(msgType); - if (ObjectUtil.isEmpty(userIds)) { - userIds = new HashSet<>(); - messageTypeSessionMap.put(msgType, userIds); - } - userIds.add(userId); } /** @@ -141,8 +64,5 @@ public class SessionCenter { **/ public static void closed(String userId) { socketSessionMap.remove(userId); - for (Map.Entry> stringListEntry : messageTypeSessionMap.entrySet()) { - stringListEntry.getValue().removeIf(item -> item.equals(userId)); - } } }