删除socket无用逻辑

pull/22/head
rays 2021-06-07 15:09:39 +08:00
parent e23fc5d24b
commit 2b715d90fe
4 changed files with 6 additions and 116 deletions

View File

@ -13,11 +13,6 @@ import lombok.Getter;
@Getter @Getter
public enum ClientMessageTypeEnum { public enum ClientMessageTypeEnum {
/**
*
*/
USER_ADD_MSG_TYPE("200001", "用户添加一个监听的消息类型"),
/** /**
* *
*/ */

View File

@ -1,21 +1,14 @@
package cn.stylefeng.roses.kernel.socket.business.websocket.spring; package cn.stylefeng.roses.kernel.socket.business.websocket.spring;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import cn.stylefeng.roses.kernel.socket.api.expander.SocketConfigExpander; import cn.stylefeng.roses.kernel.socket.api.expander.SocketConfigExpander;
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer; import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import com.gettyio.core.channel.config.ServerConfig; import com.gettyio.core.channel.config.ServerConfig;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import static cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum.*;
/** /**
* Spring BootWebSocket * Spring BootWebSocket
* *
@ -26,9 +19,6 @@ import static cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum.*
@Slf4j @Slf4j
public class WebSocketApplicationRunnerImpl implements ApplicationRunner { public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
@Autowired
private SocketOperatorApi socketOperatorApi;
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
// 初始化配置对象 // 初始化配置对象
@ -51,14 +41,5 @@ public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
WebSocketServer.run(aioServerConfig); WebSocketServer.run(aioServerConfig);
log.info("WebSocket Server Start Success!"); log.info("WebSocket Server Start Success!");
// 添加用户新增消息类型的回调
socketOperatorApi.msgTypeCallback(USER_ADD_MSG_TYPE.getCode(), (msgType, msg, socketSession) -> {
// 转换对象
WebSocketMessagePOJO webSocketMessage = (WebSocketMessagePOJO)msg;
// 维护会话中心的消息类型
SessionCenter.addSocketSessionMsgType(webSocketMessage.getData().toString(), webSocketMessage.getFormUserId());
});
} }
} }

View File

@ -42,18 +42,12 @@ public class WebSocketOperator implements SocketOperatorApi {
@Override @Override
public void sendMsgOfAllUserSession(String msgType, Object msg) { public void sendMsgOfAllUserSession(String msgType, Object msg) {
// 获取监听该消息类型的所有会话 for (SocketSession<GettySocketOperator> socketSession : SessionCenter.getSocketSessionMap().values()) {
List<SocketSession<GettySocketOperator>> socketSessionList = SessionCenter.getSocketSessionByMsgType(msgType); WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
webSocketMessagePOJO.setData(msg);
if (ObjectUtil.isNotEmpty(socketSessionList)) { webSocketMessagePOJO.setServerMsgType(msgType);
// 给所有会话发送消息 // 发送内容
for (SocketSession<GettySocketOperator> socketSession : socketSessionList) { socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
webSocketMessagePOJO.setData(msg);
webSocketMessagePOJO.setServerMsgType(msgType);
// 发送内容
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
}
} }
} }

View File

@ -1,10 +1,8 @@
package cn.stylefeng.roses.kernel.socket.websocket.session; package cn.stylefeng.roses.kernel.socket.websocket.session;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession; import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator; import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -23,11 +21,6 @@ public class SessionCenter {
*/ */
private static ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>(); private static ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
/**
* ID
*/
private static ConcurrentMap<String, Set<String>> messageTypeSessionMap = new ConcurrentHashMap<>();
/** /**
* *
* *
@ -39,17 +32,6 @@ public class SessionCenter {
return socketSessionMap; return socketSessionMap;
} }
/**
* ID
*
* @return {@link ConcurrentMap< String, Set<String>>}
* @author majianguo
* @date 2021/6/1 2:14
**/
public static ConcurrentMap<String, Set<String>> getMessageTypeSessionMap() {
return messageTypeSessionMap;
}
/** /**
* ID * ID
* *
@ -70,66 +52,7 @@ public class SessionCenter {
* @date 2021/6/1 1:49 * @date 2021/6/1 1:49
**/ **/
public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) { public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) {
// 维护会话
socketSessionMap.put(socketSession.getUserId(), socketSession); socketSessionMap.put(socketSession.getUserId(), socketSession);
// 维护会话所有的消息类型和会话的关系
if (ObjectUtil.isNotEmpty(socketSession.getMessageTypes())) {
for (String messageType : socketSession.getMessageTypes()) {
Set<String> userIds = messageTypeSessionMap.get(messageType);
if (ObjectUtil.isEmpty(userIds)) {
userIds = new HashSet<>();
messageTypeSessionMap.put(messageType, userIds);
}
userIds.add(socketSession.getUserId());
}
}
}
/**
*
*
* @return {@link List< SocketSession<GettySocketOperator>>}
* @author majianguo
* @date 2021/6/1 2:06
**/
public static List<SocketSession<GettySocketOperator>> getSocketSessionByMsgType(String msgType) {
List<SocketSession<GettySocketOperator>> res = new ArrayList<>();
// 获取监听该消息所有的会话
Set<String> userIds = messageTypeSessionMap.get(msgType);
if (ObjectUtil.isNotEmpty(userIds)) {
for (String userId : userIds) {
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(userId);
res.add(socketSession);
}
}
return res;
}
/**
*
*
* @param msgType
* @param userId ID
* @author majianguo
* @date 2021/6/1 2:11
**/
public static void addSocketSessionMsgType(String msgType, String userId) {
// 维护Session信息
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(userId);
if (ObjectUtil.isNotEmpty(socketSession)) {
socketSession.getMessageTypes().add(msgType);
}
// 维护消息列表
Set<String> userIds = messageTypeSessionMap.get(msgType);
if (ObjectUtil.isEmpty(userIds)) {
userIds = new HashSet<>();
messageTypeSessionMap.put(msgType, userIds);
}
userIds.add(userId);
} }
/** /**
@ -141,8 +64,5 @@ public class SessionCenter {
**/ **/
public static void closed(String userId) { public static void closed(String userId) {
socketSessionMap.remove(userId); socketSessionMap.remove(userId);
for (Map.Entry<String, Set<String>> stringListEntry : messageTypeSessionMap.entrySet()) {
stringListEntry.getValue().removeIf(item -> item.equals(userId));
}
} }
} }