【7.0.4】【socket】整理代码

pull/22/head
fengshuonan 2021-06-12 09:19:00 +08:00
parent 01aa9f1b2e
commit 98a00adcf8
13 changed files with 52 additions and 48 deletions

View File

@ -1,6 +1,5 @@
package cn.stylefeng.roses.kernel.socket.api;
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
/**
@ -45,4 +44,5 @@ public interface SocketOperatorApi {
* @date 2021/6/2 9:54
**/
void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
}

View File

@ -42,7 +42,7 @@ public class SocketConfigExpander {
* @date 2021/6/7 11:39
**/
public static String getSocketHost() {
return ConfigContext.me().getSysConfigValueWithDefault("socket_host", String.class, "0.0.0.0");
return ConfigContext.me().getSysConfigValueWithDefault("SOCKET_HOST", String.class, "0.0.0.0");
}
/**
@ -53,7 +53,7 @@ public class SocketConfigExpander {
* @date 2021/6/7 11:41
**/
public static Integer getSocketPort() {
return ConfigContext.me().getSysConfigValueWithDefault("socket_port", Integer.class, 11130);
return ConfigContext.me().getSysConfigValueWithDefault("SOCKET_PORT", Integer.class, 11130);
}
/**
@ -64,7 +64,7 @@ public class SocketConfigExpander {
* @date 2021/6/7 11:41
**/
public static Integer getSocketServerChunkSize() {
return ConfigContext.me().getSysConfigValueWithDefault("socket_server_chunk_size", Integer.class, 512 * 1024 * 1024);
return ConfigContext.me().getSysConfigValueWithDefault("SOCKET_SERVER_CHUNK_SIZE", Integer.class, 512 * 1024 * 1024);
}
}

View File

@ -67,4 +67,5 @@ public interface SocketSessionOperatorApi {
* @date 2021/6/1 11:50
**/
boolean isInvalid();
}

View File

@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.net.StandardSocketOptions;
/**
@ -35,6 +36,7 @@ public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
aioServerConfig.setServerChunkSize(SocketConfigExpander.getSocketServerChunkSize());
// 设置SocketOptions
// 每个套接口都有一个发送缓冲区和一个接收缓冲区使用SO_RCVBUF可以改变缺省缓冲区大小。
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
// 启动

View File

@ -17,8 +17,10 @@ public class SocketMessageCenter {
/**
*
* <p>
* keymsgTypevalue
*/
private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();
private static final Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();
/**
*
@ -43,4 +45,5 @@ public class SocketMessageCenter {
public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
return messageListenerMap.get(msgType);
}
}

View File

@ -1,20 +1,15 @@
package cn.stylefeng.roses.kernel.socket.websocket.operator;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
import cn.stylefeng.roses.kernel.socket.api.exception.enums.SocketExceptionEnum;
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import com.alibaba.fastjson.JSON;
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
import java.util.List;
/**
* WebSocket
@ -33,21 +28,21 @@ public class WebSocketOperator implements SocketOperatorApi {
if (ObjectUtil.isEmpty(socketSession)) {
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
}
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
webSocketMessagePOJO.setData(msg);
webSocketMessagePOJO.setServerMsgType(msgType);
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
webSocketMessageDTO.setData(msg);
webSocketMessageDTO.setServerMsgType(msgType);
// 发送内容
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
}
@Override
public void sendMsgOfAllUserSession(String msgType, Object msg) {
for (SocketSession<GettySocketOperator> socketSession : SessionCenter.getSocketSessionMap().values()) {
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
webSocketMessagePOJO.setData(msg);
webSocketMessagePOJO.setServerMsgType(msgType);
WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
webSocketMessageDTO.setData(msg);
webSocketMessageDTO.setServerMsgType(msgType);
// 发送内容
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
}
}

View File

@ -1,6 +1,5 @@
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
import com.alibaba.fastjson.JSON;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;

View File

@ -1,4 +1,4 @@
package cn.stylefeng.roses.kernel.socket.websocket.message;
package cn.stylefeng.roses.kernel.socket.websocket.pojo;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import lombok.Data;
@ -9,15 +9,16 @@ import lombok.Data;
* serverMsgTypeclientMsgType
* 1.serverMsgType
* (type:100001),
* <p>
* 2.clientMsgType
* (type:299999)
* {@link SocketOperatorApi#msgTypeCallback}
* {@link SocketOperatorApi#msgTypeCallback}
*
* @author majianguo
* @date 2021/6/1 2:56
*/
@Data
public class WebSocketMessagePOJO {
public class WebSocketMessageDTO {
/**
* ()
@ -40,7 +41,8 @@ public class WebSocketMessagePOJO {
private String formUserId;
/**
*
*
*/
private Object data;
}

View File

@ -27,6 +27,6 @@ public class WebSocketInitializer extends ChannelInitializer {
// 添加自定义的消息处理器
pipeline.addLast(new WebSocketMessageHandler());
}
}

View File

@ -19,13 +19,15 @@ public class ChannelIdAndUserBindCenter {
/**
*
* <p>
* keychannelIdidvalueuserId
*/
private static ConcurrentMap<String, String> channelIdAndUserBind = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, String> channelIdAndUserBind = new ConcurrentHashMap<>();
/**
*
*/
private static List<SocketChannel> waitingBindList = Collections.synchronizedList(new ArrayList<>());
private static final List<SocketChannel> waitingBindList = Collections.synchronizedList(new ArrayList<>());
/**
* ID
@ -95,4 +97,5 @@ public class ChannelIdAndUserBindCenter {
waitingBindList.removeIf(item -> item.getChannelId().equals(channelId));
channelIdAndUserBind.remove(channelId);
}
}

View File

@ -3,12 +3,12 @@ package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import cn.stylefeng.roses.kernel.socket.websocket.server.bind.ChannelIdAndUserBindCenter;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
import cn.stylefeng.roses.kernel.socket.websocket.server.bind.ChannelIdAndUserBindCenter;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import com.alibaba.fastjson.JSON;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.core.pipeline.in.SimpleChannelInboundHandler;
@ -52,10 +52,10 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
String data = new String(webSocketFrame.getPayloadData(), StandardCharsets.UTF_8);
// 转换为Java对象
WebSocketMessagePOJO webSocketMessagePOJO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessagePOJO.class);
WebSocketMessageDTO webSocketMessageDTO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessageDTO.class);
// 心跳包
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessagePOJO.getClientMsgType())) {
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessageDTO.getClientMsgType())) {
// 更新用户最后活跃时间
String userId = ChannelIdAndUserBindCenter.getUserId(socketChannel.getChannelId());
if (ObjectUtil.isNotEmpty(userId)) {
@ -65,17 +65,17 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
}
// 用户ID为空不处理直接跳过
if (ObjectUtil.isEmpty(webSocketMessagePOJO.getFormUserId())) {
if (ObjectUtil.isEmpty(webSocketMessageDTO.getFormUserId())) {
return;
}
// 维护通道和用户ID的绑定关系
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessagePOJO.getFormUserId())) {
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessagePOJO.getFormUserId());
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessageDTO.getFormUserId())) {
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessageDTO.getFormUserId());
// 创建api的会话对象
SocketSession<GettySocketOperator> socketSession = new SocketSession<>();
socketSession.setUserId(webSocketMessagePOJO.getFormUserId());
socketSession.setUserId(webSocketMessageDTO.getFormUserId());
socketSession.setSocketOperatorApi(new GettySocketOperator(socketChannel));
socketSession.setConnectionTime(System.currentTimeMillis());
@ -84,20 +84,21 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
}
// 更新最后会话时间
SocketSession<GettySocketOperator> userSession = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
SocketSession<GettySocketOperator> userSession = SessionCenter.getSessionByUserId(webSocketMessageDTO.getFormUserId());
userSession.setLastActiveTime(System.currentTimeMillis());
// 找到该消息的处理器
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessagePOJO.getClientMsgType());
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessageDTO.getClientMsgType());
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
// 获取会话
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessageDTO.getFormUserId());
// 触发回调
socketMsgCallbackInterface.callback(webSocketMessagePOJO.getClientMsgType(), webSocketMessagePOJO, session);
socketMsgCallbackInterface.callback(webSocketMessageDTO.getClientMsgType(), webSocketMessageDTO, session);
} else {
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"code\":\"404\"}"));
}
}
}
}

View File

@ -19,12 +19,11 @@ public class SessionCenter {
/**
*
*/
private static ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
/**
*
*
* @return {@link ConcurrentMap< String, SocketSession<GettySocketOperator>>}
* @author majianguo
* @date 2021/6/1 2:13
**/
@ -36,7 +35,6 @@ public class SessionCenter {
* ID
*
* @param userId ID
* @return {@link SocketSession <GettySocketOperator>}
* @author majianguo
* @date 2021/6/1 1:48
**/
@ -65,4 +63,5 @@ public class SessionCenter {
public static void closed(String userId) {
socketSessionMap.remove(userId);
}
}

View File

@ -42,7 +42,6 @@ public class GunsSocketAutoConfiguration {
/**
* Socket
*
* @return {@link cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi}
* @author majianguo
* @date 2021/6/2 5:48
**/