mirror of https://gitee.com/stylefeng/roses
整理WebSocket实现
parent
038be58952
commit
3df9df4ada
|
@ -15,6 +15,19 @@ public interface SocketOperatorApi {
|
||||||
/**
|
/**
|
||||||
* 发送消息到指定会话
|
* 发送消息到指定会话
|
||||||
*
|
*
|
||||||
|
* @param msgType 消息类型可参考{@link cn.stylefeng.roses.kernel.socket.api.enums}枚举类
|
||||||
|
* @param sessionId 会话ID
|
||||||
|
* @param msg 消息体
|
||||||
|
* @author majianguo
|
||||||
|
* @date 2021/6/11 下午2:19
|
||||||
|
**/
|
||||||
|
void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息到指定用户的所有会话
|
||||||
|
* <p>
|
||||||
|
* 如果用户同一个消息类型建立了多个会话,则统一全部发送
|
||||||
|
*
|
||||||
* @param msgType 消息类型可参考{@link cn.stylefeng.roses.kernel.socket.api.enums}枚举类
|
* @param msgType 消息类型可参考{@link cn.stylefeng.roses.kernel.socket.api.enums}枚举类
|
||||||
* @param userId 用户ID
|
* @param userId 用户ID
|
||||||
* @param msg 消息体
|
* @param msg 消息体
|
||||||
|
@ -44,5 +57,4 @@ public interface SocketOperatorApi {
|
||||||
* @date 2021/6/2 上午9:54
|
* @date 2021/6/2 上午9:54
|
||||||
**/
|
**/
|
||||||
void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
|
void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,11 @@ import lombok.Getter;
|
||||||
@Getter
|
@Getter
|
||||||
public enum ClientMessageTypeEnum {
|
public enum ClientMessageTypeEnum {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户连接鉴权
|
||||||
|
*/
|
||||||
|
USER_CONNECTION_AUTHENTICATION("200000", "用户连接鉴权"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户心跳消息类型
|
* 用户心跳消息类型
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,7 +16,12 @@ public enum ServerMessageTypeEnum {
|
||||||
/**
|
/**
|
||||||
* 系统通知消息类型
|
* 系统通知消息类型
|
||||||
*/
|
*/
|
||||||
SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型");
|
SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接消息回复
|
||||||
|
*/
|
||||||
|
SYS_REPLY_MSG_TYPE("100002", "连接消息回复");
|
||||||
|
|
||||||
private final String code;
|
private final String code;
|
||||||
|
|
||||||
|
|
|
@ -66,5 +66,4 @@ public class SocketConfigExpander {
|
||||||
public static Integer getSocketServerChunkSize() {
|
public static Integer getSocketServerChunkSize() {
|
||||||
return ConfigContext.me().getSysConfigValueWithDefault("SOCKET_SERVER_CHUNK_SIZE", Integer.class, 512 * 1024 * 1024);
|
return ConfigContext.me().getSysConfigValueWithDefault("SOCKET_SERVER_CHUNK_SIZE", Integer.class, 512 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,5 +67,4 @@ public interface SocketSessionOperatorApi {
|
||||||
* @date 2021/6/1 上午11:50
|
* @date 2021/6/1 上午11:50
|
||||||
**/
|
**/
|
||||||
boolean isInvalid();
|
boolean isInvalid();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,15 +15,20 @@ import java.util.Set;
|
||||||
@Data
|
@Data
|
||||||
public class SocketSession<T extends SocketSessionOperatorApi> {
|
public class SocketSession<T extends SocketSessionOperatorApi> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 会话ID,每一个新建的会话都有(目前使用通道ID)
|
||||||
|
*/
|
||||||
|
private String sessionId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 会话唯一标识
|
* 会话唯一标识
|
||||||
*/
|
*/
|
||||||
private String userId;
|
private String userId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 该会话所有的监听消息类型
|
* 该会话监听的消息类型
|
||||||
*/
|
*/
|
||||||
private Set<String> messageTypes = new HashSet<>();
|
private String messageType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接时间
|
* 连接时间
|
||||||
|
|
|
@ -7,7 +7,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationArguments;
|
||||||
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.boot.ApplicationRunner;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.net.StandardSocketOptions;
|
import java.net.StandardSocketOptions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,7 +35,6 @@ public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
|
||||||
aioServerConfig.setServerChunkSize(SocketConfigExpander.getSocketServerChunkSize());
|
aioServerConfig.setServerChunkSize(SocketConfigExpander.getSocketServerChunkSize());
|
||||||
|
|
||||||
// 设置SocketOptions
|
// 设置SocketOptions
|
||||||
// 每个套接口都有一个发送缓冲区和一个接收缓冲区,使用SO_RCVBUF可以改变缺省缓冲区大小。
|
|
||||||
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
|
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
|
||||||
|
|
||||||
// 启动
|
// 启动
|
||||||
|
|
|
@ -20,5 +20,12 @@
|
||||||
<artifactId>socket-api</artifactId>
|
<artifactId>socket-api</artifactId>
|
||||||
<version>${roses.version}</version>
|
<version>${roses.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!--jwt模块的sdk-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>cn.stylefeng.roses</groupId>
|
||||||
|
<artifactId>jwt-sdk</artifactId>
|
||||||
|
<version>${roses.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
|
@ -1,7 +1,6 @@
|
||||||
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
||||||
|
|
||||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -17,10 +16,8 @@ public class SocketMessageCenter {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 所有消息监听器维护
|
* 所有消息监听器维护
|
||||||
* <p>
|
|
||||||
* key是msgType,value是消息回调监听器
|
|
||||||
*/
|
*/
|
||||||
private static final Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();
|
private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设置消息类型的监听器
|
* 设置消息类型的监听器
|
||||||
|
@ -45,5 +42,4 @@ public class SocketMessageCenter {
|
||||||
public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
|
public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
|
||||||
return messageListenerMap.get(msgType);
|
return messageListenerMap.get(msgType);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,9 @@ import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOp
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WebSocket操作实现类
|
* WebSocket操作实现类
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -22,27 +25,46 @@ import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||||
public class WebSocketOperator implements SocketOperatorApi {
|
public class WebSocketOperator implements SocketOperatorApi {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMsgOfUserSession(String msgType, String userId, Object msg) {
|
public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) {
|
||||||
// 根据用户ID获取会话
|
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(sessionId);
|
||||||
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionByUserId(userId);
|
if (ObjectUtil.isEmpty(session)) {
|
||||||
if (ObjectUtil.isEmpty(socketSession)) {
|
|
||||||
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
||||||
}
|
}
|
||||||
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
|
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
|
||||||
webSocketMessageDTO.setData(msg);
|
webSocketMessageDTO.setData(msg);
|
||||||
webSocketMessageDTO.setServerMsgType(msgType);
|
webSocketMessageDTO.setServerMsgType(msgType);
|
||||||
// 发送内容
|
session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
|
||||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendMsgOfUserSession(String msgType, String userId, Object msg) {
|
||||||
|
// 根据用户ID获取会话
|
||||||
|
List<SocketSession<GettySocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId);
|
||||||
|
if (ObjectUtil.isEmpty(socketSessionList)) {
|
||||||
|
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
||||||
|
}
|
||||||
|
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
|
||||||
|
webSocketMessageDTO.setData(msg);
|
||||||
|
webSocketMessageDTO.setServerMsgType(msgType);
|
||||||
|
for (SocketSession<GettySocketOperator> session : socketSessionList) {
|
||||||
|
// 发送内容
|
||||||
|
session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMsgOfAllUserSession(String msgType, Object msg) {
|
public void sendMsgOfAllUserSession(String msgType, Object msg) {
|
||||||
for (SocketSession<GettySocketOperator> socketSession : SessionCenter.getSocketSessionMap().values()) {
|
Collection<List<SocketSession<GettySocketOperator>>> values = SessionCenter.getSocketSessionMap().values();
|
||||||
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
|
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
|
||||||
webSocketMessageDTO.setData(msg);
|
webSocketMessageDTO.setData(msg);
|
||||||
webSocketMessageDTO.setServerMsgType(msgType);
|
webSocketMessageDTO.setServerMsgType(msgType);
|
||||||
// 发送内容
|
for (List<SocketSession<GettySocketOperator>> sessions : values) {
|
||||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
|
for (SocketSession<GettySocketOperator> session : sessions) {
|
||||||
|
// 找到该类型的通道
|
||||||
|
if (session.getMessageType().equals(msgType)) {
|
||||||
|
session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,5 +28,4 @@ public class WebSocketInitializer extends ChannelInitializer {
|
||||||
// 添加自定义的消息处理器
|
// 添加自定义的消息处理器
|
||||||
pipeline.addLast(new WebSocketMessageHandler());
|
pipeline.addLast(new WebSocketMessageHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,101 +0,0 @@
|
||||||
package cn.stylefeng.roses.kernel.socket.websocket.server.bind;
|
|
||||||
|
|
||||||
import com.gettyio.core.channel.SocketChannel;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通道和用户的绑定中心
|
|
||||||
*
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:09
|
|
||||||
*/
|
|
||||||
public class ChannelIdAndUserBindCenter {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通道和用户绑定关系映射
|
|
||||||
* <p>
|
|
||||||
* key是channelId通道id,value是userId
|
|
||||||
*/
|
|
||||||
private static final ConcurrentMap<String, String> channelIdAndUserBind = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 等待绑定的通道
|
|
||||||
*/
|
|
||||||
private static final List<SocketChannel> waitingBindList = Collections.synchronizedList(new ArrayList<>());
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取通道ID
|
|
||||||
*
|
|
||||||
* @param channelId 通道ID
|
|
||||||
* @return {@link java.lang.String}
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:33
|
|
||||||
**/
|
|
||||||
public static String getUserId(String channelId) {
|
|
||||||
return channelIdAndUserBind.get(channelId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 添加一个未绑定的通道
|
|
||||||
*
|
|
||||||
* @param socketChannel 通道对象
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:17
|
|
||||||
**/
|
|
||||||
public static void addSocketChannel(SocketChannel socketChannel) {
|
|
||||||
waitingBindList.add(socketChannel);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 绑定关系
|
|
||||||
*
|
|
||||||
* @param channelId 通道ID
|
|
||||||
* @param userId 用户ID
|
|
||||||
* @return {@link boolean}
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:21
|
|
||||||
**/
|
|
||||||
public static boolean bind(String channelId, String userId) {
|
|
||||||
Iterator<SocketChannel> iterator = waitingBindList.iterator();
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
SocketChannel item = iterator.next();
|
|
||||||
if (item.getChannelId().equals(channelId)) {
|
|
||||||
channelIdAndUserBind.put(channelId, userId);
|
|
||||||
iterator.remove();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用户是否已绑定通道
|
|
||||||
*
|
|
||||||
* @param userId 用户ID
|
|
||||||
* @return {@link boolean}
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:29
|
|
||||||
**/
|
|
||||||
public static boolean isBind(String userId) {
|
|
||||||
return channelIdAndUserBind.containsValue(userId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 关闭通道
|
|
||||||
*
|
|
||||||
* @param channelId 通道ID
|
|
||||||
* @author majianguo
|
|
||||||
* @date 2021/6/1 下午3:31
|
|
||||||
**/
|
|
||||||
public static void closed(String channelId) {
|
|
||||||
waitingBindList.removeIf(item -> item.getChannelId().equals(channelId));
|
|
||||||
channelIdAndUserBind.remove(channelId);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,14 +1,15 @@
|
||||||
package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
|
package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
|
import cn.stylefeng.roses.kernel.jwt.api.context.JwtContext;
|
||||||
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
|
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
|
||||||
|
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.server.bind.ChannelIdAndUserBindCenter;
|
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||||
|
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.gettyio.core.channel.SocketChannel;
|
import com.gettyio.core.channel.SocketChannel;
|
||||||
import com.gettyio.core.pipeline.in.SimpleChannelInboundHandler;
|
import com.gettyio.core.pipeline.in.SimpleChannelInboundHandler;
|
||||||
|
@ -30,19 +31,12 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
||||||
@Override
|
@Override
|
||||||
public void channelAdded(SocketChannel aioChannel) {
|
public void channelAdded(SocketChannel aioChannel) {
|
||||||
log.info(aioChannel.getChannelId() + " connection successful.");
|
log.info(aioChannel.getChannelId() + " connection successful.");
|
||||||
ChannelIdAndUserBindCenter.addSocketChannel(aioChannel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelClosed(SocketChannel aioChannel) {
|
public void channelClosed(SocketChannel aioChannel) {
|
||||||
log.info(aioChannel.getChannelId() + " disconnected");
|
log.info(aioChannel.getChannelId() + " disconnected");
|
||||||
// 获取用户ID
|
SessionCenter.closed(aioChannel.getChannelId());
|
||||||
String userId = ChannelIdAndUserBindCenter.getUserId(aioChannel.getChannelId());
|
|
||||||
if (ObjectUtil.isNotEmpty(userId)) {
|
|
||||||
// 根据用户ID关闭会话
|
|
||||||
SessionCenter.closed(userId);
|
|
||||||
}
|
|
||||||
ChannelIdAndUserBindCenter.closed(aioChannel.getChannelId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -52,53 +46,71 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
||||||
String data = new String(webSocketFrame.getPayloadData(), StandardCharsets.UTF_8);
|
String data = new String(webSocketFrame.getPayloadData(), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
// 转换为Java对象
|
// 转换为Java对象
|
||||||
WebSocketMessageDTO webSocketMessageDTO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessageDTO.class);
|
WebSocketMessageDTO WebSocketMessageDTO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessageDTO.class);
|
||||||
|
|
||||||
// 心跳包
|
// 心跳包
|
||||||
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessageDTO.getClientMsgType())) {
|
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||||
// 更新用户最后活跃时间
|
// 更新会话最后活跃时间
|
||||||
String userId = ChannelIdAndUserBindCenter.getUserId(socketChannel.getChannelId());
|
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(socketChannel.getChannelId());
|
||||||
if (ObjectUtil.isNotEmpty(userId)) {
|
if (ObjectUtil.isNotEmpty(session)) {
|
||||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(userId);
|
|
||||||
session.setLastActiveTime(System.currentTimeMillis());
|
session.setLastActiveTime(System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 用户ID为空不处理直接跳过
|
// 用户ID为空不处理直接跳过
|
||||||
if (ObjectUtil.isEmpty(webSocketMessageDTO.getFormUserId())) {
|
if (ObjectUtil.isEmpty(WebSocketMessageDTO.getFormUserId())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 维护通道和用户ID的绑定关系
|
// 维护通道是否已初始化
|
||||||
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessageDTO.getFormUserId())) {
|
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getChannelId());
|
||||||
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessageDTO.getFormUserId());
|
if (ObjectUtil.isEmpty(socketSession) && ClientMessageTypeEnum.USER_CONNECTION_AUTHENTICATION.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||||
|
// 操作api包装
|
||||||
|
GettySocketOperator gettySocketOperator = new GettySocketOperator(socketChannel);
|
||||||
|
|
||||||
// 创建api的会话对象
|
// 回复消息
|
||||||
SocketSession<GettySocketOperator> socketSession = new SocketSession<>();
|
WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
|
||||||
socketSession.setUserId(webSocketMessageDTO.getFormUserId());
|
replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
|
||||||
socketSession.setSocketOperatorApi(new GettySocketOperator(socketChannel));
|
replyMsg.setToUserId(WebSocketMessageDTO.getFormUserId());
|
||||||
socketSession.setConnectionTime(System.currentTimeMillis());
|
|
||||||
|
|
||||||
// 维护会话
|
try {
|
||||||
SessionCenter.addSocketSession(socketSession);
|
// 校验token是否合法
|
||||||
|
JwtContext.me().validateTokenWithException(WebSocketMessageDTO.getData().toString());
|
||||||
|
|
||||||
|
// 设置回复内容
|
||||||
|
replyMsg.setData(socketChannel.getChannelId());
|
||||||
|
|
||||||
|
// 创建会话对象
|
||||||
|
socketSession = new SocketSession<>();
|
||||||
|
socketSession.setSessionId(socketChannel.getChannelId());
|
||||||
|
socketSession.setUserId(WebSocketMessageDTO.getFormUserId());
|
||||||
|
socketSession.setSocketOperatorApi(gettySocketOperator);
|
||||||
|
socketSession.setConnectionTime(System.currentTimeMillis());
|
||||||
|
|
||||||
|
// 维护会话
|
||||||
|
SessionCenter.addSocketSession(socketSession);
|
||||||
|
} finally {
|
||||||
|
// 回复消息
|
||||||
|
gettySocketOperator.writeAndFlush(replyMsg);
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新最后会话时间
|
// 会话建立成功执行业务逻辑
|
||||||
SocketSession<GettySocketOperator> userSession = SessionCenter.getSessionByUserId(webSocketMessageDTO.getFormUserId());
|
if (ObjectUtil.isNotEmpty(socketSession)) {
|
||||||
userSession.setLastActiveTime(System.currentTimeMillis());
|
|
||||||
|
|
||||||
// 找到该消息的处理器
|
// 更新最后会话时间
|
||||||
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessageDTO.getClientMsgType());
|
socketSession.setLastActiveTime(System.currentTimeMillis());
|
||||||
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
|
||||||
// 获取会话
|
|
||||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessageDTO.getFormUserId());
|
|
||||||
|
|
||||||
// 触发回调
|
// 找到该消息的处理器
|
||||||
socketMsgCallbackInterface.callback(webSocketMessageDTO.getClientMsgType(), webSocketMessageDTO, session);
|
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(WebSocketMessageDTO.getClientMsgType());
|
||||||
} else {
|
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
||||||
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"code\":\"404\"}"));
|
// 触发回调
|
||||||
|
socketMsgCallbackInterface.callback(WebSocketMessageDTO.getClientMsgType(), WebSocketMessageDTO, socketSession);
|
||||||
|
} else {
|
||||||
|
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"serverMsgType\":\"404\"}"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package cn.stylefeng.roses.kernel.socket.websocket.session;
|
package cn.stylefeng.roses.kernel.socket.websocket.session;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
@ -19,29 +22,62 @@ public class SessionCenter {
|
||||||
/**
|
/**
|
||||||
* 所有会话维护
|
* 所有会话维护
|
||||||
*/
|
*/
|
||||||
private static final ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
|
private static ConcurrentMap<String, List<SocketSession<GettySocketOperator>>> socketSessionMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取维护的所有会话
|
* 获取维护的所有会话
|
||||||
*
|
*
|
||||||
|
* @return {@link ConcurrentMap< String, SocketSession<GettySocketOperator>>}
|
||||||
* @author majianguo
|
* @author majianguo
|
||||||
* @date 2021/6/1 下午2:13
|
* @date 2021/6/1 下午2:13
|
||||||
**/
|
**/
|
||||||
public static ConcurrentMap<String, SocketSession<GettySocketOperator>> getSocketSessionMap() {
|
public static ConcurrentMap<String, List<SocketSession<GettySocketOperator>>> getSocketSessionMap() {
|
||||||
return socketSessionMap;
|
return socketSessionMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 根据用户ID获取会话详情
|
* 根据用户ID获取会话信息列表
|
||||||
*
|
*
|
||||||
* @param userId 用户ID
|
* @param userId 用户ID
|
||||||
|
* @return {@link SocketSession <GettySocketOperator>}
|
||||||
* @author majianguo
|
* @author majianguo
|
||||||
* @date 2021/6/1 下午1:48
|
* @date 2021/6/1 下午1:48
|
||||||
**/
|
**/
|
||||||
public static SocketSession<GettySocketOperator> getSessionByUserId(String userId) {
|
public static List<SocketSession<GettySocketOperator>> getSessionByUserId(String userId) {
|
||||||
return socketSessionMap.get(userId);
|
return socketSessionMap.get(userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据用户ID和消息类型获取会话信息列表
|
||||||
|
*
|
||||||
|
* @param userId 用户ID
|
||||||
|
* @return {@link SocketSession <GettySocketOperator>}
|
||||||
|
* @author majianguo
|
||||||
|
* @date 2021/6/1 下午1:48
|
||||||
|
**/
|
||||||
|
public static List<SocketSession<GettySocketOperator>> getSessionByUserIdAndMsgType(String userId) {
|
||||||
|
return socketSessionMap.get(userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据会话ID获取会话信息
|
||||||
|
*
|
||||||
|
* @param sessionId 会话ID
|
||||||
|
* @return {@link SocketSession <GettySocketOperator>}
|
||||||
|
* @author majianguo
|
||||||
|
* @date 2021/6/1 下午1:48
|
||||||
|
**/
|
||||||
|
public static SocketSession<GettySocketOperator> getSessionBySessionId(String sessionId) {
|
||||||
|
for (List<SocketSession<GettySocketOperator>> values : socketSessionMap.values()) {
|
||||||
|
for (SocketSession<GettySocketOperator> session : values) {
|
||||||
|
if (sessionId.equals(session.getSessionId())) {
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设置会话
|
* 设置会话
|
||||||
*
|
*
|
||||||
|
@ -50,18 +86,24 @@ public class SessionCenter {
|
||||||
* @date 2021/6/1 下午1:49
|
* @date 2021/6/1 下午1:49
|
||||||
**/
|
**/
|
||||||
public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) {
|
public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) {
|
||||||
socketSessionMap.put(socketSession.getUserId(), socketSession);
|
List<SocketSession<GettySocketOperator>> socketSessions = socketSessionMap.get(socketSession.getUserId());
|
||||||
|
if (ObjectUtil.isEmpty(socketSessions)) {
|
||||||
|
socketSessions = new ArrayList<>();
|
||||||
|
socketSessionMap.put(socketSession.getUserId(), socketSessions);
|
||||||
|
}
|
||||||
|
socketSessions.add(socketSession);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接关闭
|
* 连接关闭
|
||||||
*
|
*
|
||||||
* @param userId 用户ID
|
* @param sessionId 会话ID
|
||||||
* @author majianguo
|
* @author majianguo
|
||||||
* @date 2021/6/1 下午3:25
|
* @date 2021/6/1 下午3:25
|
||||||
**/
|
**/
|
||||||
public static void closed(String userId) {
|
public static void closed(String sessionId) {
|
||||||
socketSessionMap.remove(userId);
|
for (List<SocketSession<GettySocketOperator>> values : socketSessionMap.values()) {
|
||||||
|
values.removeIf(item -> item.getSessionId().equals(sessionId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue