新增Socket模块,默认实现WebSocket

pull/22/head
rays 2021-06-02 17:14:23 +08:00
parent 2b8ec525b6
commit 7858b13c29
29 changed files with 1298 additions and 0 deletions

37
kernel-d-socket/pom.xml Normal file
View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>roses-kernel</artifactId>
<version>7.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>kernel-d-socket</artifactId>
<packaging>pom</packaging>
<modules>
<module>socket-api</module>
<module>socket-sdk-websocket</module>
<module>socket-business-websocket</module>
<module>socket-spring-boot-starter</module>
</modules>
<dependencies>
<!-- 开发规则 -->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-a-rule</artifactId>
<version>7.0.4</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1 @@
socket模块的api

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-d-socket</artifactId>
<version>7.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>socket-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<!--config模块的api-->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>config-api</artifactId>
<version>7.0.4</version>
</dependency>
<!-- Getty核心包 -->
<dependency>
<groupId>com.gettyio</groupId>
<artifactId>getty-core</artifactId>
</dependency>
<!-- 拓展包 -->
<dependency>
<groupId>com.gettyio</groupId>
<artifactId>getty-expansion</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,24 @@
package cn.stylefeng.roses.kernel.socket.api;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
/**
* Socket
*
* @author majianguo
* @date 2021/6/2 9:53
*/
@FunctionalInterface
public interface SocketMsgCallbackInterface {
/**
*
*
* @param msgType
* @param msg
* @param socketSession
* @author majianguo
* @date 2021/6/2 9:51
**/
void callback(String msgType, Object msg, SocketSession socketSession);
}

View File

@ -0,0 +1,43 @@
package cn.stylefeng.roses.kernel.socket.api;
/**
* Socket
* <p>
* SocketAPI
*
* @author majianguo
* @date 2021/6/2 9:25
*/
public interface SocketOperatorApi {
/**
*
*
* @param sessionId ID(IDWebSocket使formId)
* @param msg
* @author majianguo
* @date 2021/6/2 9:35
**/
void sendMsgOfSession(String sessionId, Object msg);
/**
*
*
* @param msg
* @author majianguo
* @date 2021/6/2 9:35
**/
void sendMsgOfAllSession(Object msg);
/**
*
* <p>
* 1.,
*
* @param msgType
* @param callbackInterface
* @author majianguo
* @date 2021/6/2 9:54
**/
void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
}

View File

@ -0,0 +1,60 @@
/*
* Copyright [2020-2030] [https://www.stylefeng.cn]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GunsAPACHE LICENSE 2.0使
*
* 1.LICENSE
* 2.Guns
* 3.
* 4. https://gitee.com/stylefeng/guns
* 5. https://gitee.com/stylefeng/guns
* 6.
*/
package cn.stylefeng.roses.kernel.socket.api.constants;
/**
* socket
*
* @author majianguo
* @date 2021/6/1 11:21
*/
public interface SocketConstants {
/**
* socket
*/
String SOCKET_MODULE_NAME = "kernel-d-socket";
/**
*
*/
String SOCKET_EXCEPTION_STEP_CODE = "30";
/**
* Socket
*/
String SOCKET_HOST = "socket_host";
/**
* Socket
*/
String SOCKET_PORT = "socket_port";
/**
* Socket
*/
String SOCKET_SERVER_CHUNK_SIZE = "socket_server_chunk_size";
}

View File

@ -0,0 +1,47 @@
/*
* Copyright [2020-2030] [https://www.stylefeng.cn]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GunsAPACHE LICENSE 2.0使
*
* 1.LICENSE
* 2.Guns
* 3.
* 4. https://gitee.com/stylefeng/guns
* 5. https://gitee.com/stylefeng/guns
* 6.
*/
package cn.stylefeng.roses.kernel.socket.api.exception;
import cn.stylefeng.roses.kernel.rule.exception.AbstractExceptionEnum;
import cn.stylefeng.roses.kernel.rule.exception.base.ServiceException;
import cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants;
/**
* Socket
*
* @author majianguo
* @date 2021/6/1 11:23
*/
public class SocketException extends ServiceException {
public SocketException(AbstractExceptionEnum exception) {
super(SocketConstants.SOCKET_MODULE_NAME, exception);
}
public SocketException(String errorCode, String userTip) {
super(SocketConstants.SOCKET_MODULE_NAME, errorCode, userTip);
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright [2020-2030] [https://www.stylefeng.cn]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GunsAPACHE LICENSE 2.0使
*
* 1.LICENSE
* 2.Guns
* 3.
* 4. https://gitee.com/stylefeng/guns
* 5. https://gitee.com/stylefeng/guns
* 6.
*/
package cn.stylefeng.roses.kernel.socket.api.exception.enums;
import cn.stylefeng.roses.kernel.rule.constants.RuleConstants;
import cn.stylefeng.roses.kernel.rule.exception.AbstractExceptionEnum;
import cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants;
import lombok.Getter;
/**
* Socket
*
* @author majianguo
* @date 2021/6/1 11:25
*/
@Getter
public enum SocketExceptionEnum implements AbstractExceptionEnum {
/**
* Socket
*/
SOCKET_ERROR(RuleConstants.THIRD_ERROR_TYPE_CODE + SocketConstants.SOCKET_EXCEPTION_STEP_CODE + "01", "操作异常,具体信息为:{}"),
/**
*
*/
SESSION_NOT_EXIST(RuleConstants.THIRD_ERROR_TYPE_CODE + SocketConstants.SOCKET_EXCEPTION_STEP_CODE + "02", "会话不存在"),
;
/**
*
*/
private final String errorCode;
/**
*
*/
private final String userTip;
SocketExceptionEnum(String errorCode, String userTip) {
this.errorCode = errorCode;
this.userTip = userTip;
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright [2020-2030] [https://www.stylefeng.cn]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GunsAPACHE LICENSE 2.0使
*
* 1.LICENSE
* 2.Guns
* 3.
* 4. https://gitee.com/stylefeng/guns
* 5. https://gitee.com/stylefeng/guns
* 6.
*/
package cn.stylefeng.roses.kernel.socket.api.session;
/**
* socket
* <p>
*
* WebSocket{@link cn.stylefeng.roses.kernel.socket.websocket.channel}
*
* @author majianguo
* @date 2021/6/1 11:46
*/
public interface SocketSessionOperatorApi {
/**
*
*
* @author majianguo
* @date 2021/6/1 11:48
**/
void writeAndFlush(Object obj);
/**
*
*
* @author majianguo
* @date 2021/6/1 11:48
**/
void writeToChannel(Object obj);
/**
*
*
* @author majianguo
* @date 2021/6/1 11:48
**/
void close();
/**
*
*
* @return {@link boolean}
* @author majianguo
* @date 2021/6/1 11:50
**/
boolean isInvalid();
}

View File

@ -0,0 +1,47 @@
package cn.stylefeng.roses.kernel.socket.api.session.pojo;
import cn.stylefeng.roses.kernel.socket.api.session.SocketSessionOperatorApi;
import lombok.Data;
import java.util.Set;
/**
* Socket
*
* @author majianguo
* @date 2021/6/1 11:28
*/
@Data
public class SocketSession<T extends SocketSessionOperatorApi> {
/**
*
*/
private String sessionId;
/**
*
*/
private Set<String> messageTypes;
/**
*
*/
private Long connectionTime;
/**
*
*/
private Long lastActiveTime;
/**
* API
*/
private T socketOperatorApi;
/**
*
*/
private Object data;
}

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-d-socket</artifactId>
<version>7.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>socket-business-websocket</artifactId>
<dependencies>
<!--Socket模块的Websocket实现-->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>socket-sdk-websocket</artifactId>
<version>7.0.4</version>
</dependency>
<!--web模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,52 @@
package cn.stylefeng.roses.kernel.socket.business.websocket.spring;
import cn.stylefeng.roses.kernel.config.api.constants.ConfigConstants;
import cn.stylefeng.roses.kernel.config.api.context.ConfigContext;
import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer;
import com.gettyio.core.channel.config.ServerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.net.StandardSocketOptions;
import static cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants.*;
/**
* Spring BootWebSocket
*
* @author majianguo
* @date 2021/6/2 11:06
*/
@Component
@Slf4j
public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
// 初始化配置对象
ServerConfig aioServerConfig = new ServerConfig();
// 设置host,不设置默认0.0.0.0
String socketHost = ConfigContext.me().getSysConfigValueWithDefault(SOCKET_HOST, String.class, "0.0.0.0");
aioServerConfig.setHost(socketHost);
// 设置端口号
Integer socketPort = ConfigContext.me().getSysConfigValueWithDefault(SOCKET_PORT, Integer.class, 11130);
aioServerConfig.setPort(socketPort);
// 设置服务器端内存池最大可分配空间大小默认512mb内存池空间可以根据吞吐量设置。
// 尽量可以设置大一点,因为这不会真正的占用系统内存,只有真正使用时才会分配
Integer socketServerChunkSize = ConfigContext.me().getSysConfigValueWithDefault(SOCKET_SERVER_CHUNK_SIZE, Integer.class, 512 * 1024 * 1024);
aioServerConfig.setServerChunkSize(socketServerChunkSize);
// 设置SocketOptions
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
// 启动
WebSocketServer.run(aioServerConfig);
log.info("WebSocket Server Start Success!");
}
}

View File

@ -0,0 +1 @@
socket模块的websocket实现

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-d-socket</artifactId>
<version>7.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>socket-sdk-websocket</artifactId>
<dependencies>
<!--短信模块的api-->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>socket-api</artifactId>
<version>7.0.4</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,46 @@
package cn.stylefeng.roses.kernel.socket.websocket.message;
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
import java.util.HashMap;
import java.util.Map;
/**
*
* <p>
*
*
* @author majianguo
* @date 2021/6/1 2:20
*/
public class SocketMessageCenter {
/**
*
*/
private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();
/**
*
*
* @param msgType
* @param listener
* @author majianguo
* @date 2021/6/1 2:25
**/
public static void setMessageListener(String msgType, SocketMsgCallbackInterface listener) {
messageListenerMap.put(msgType, listener);
}
/**
*
*
* @param msgType
* @return {@link SocketMsgCallbackInterface}
* @author majianguo
* @date 2021/6/1 2:26
**/
public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
return messageListenerMap.get(msgType);
}
}

View File

@ -0,0 +1,33 @@
package cn.stylefeng.roses.kernel.socket.websocket.message;
import lombok.Data;
/**
* WebSocket
*
* @author majianguo
* @date 2021/6/1 2:56
*/
@Data
public class WebSocketMessagePOJO {
/**
*
*/
private String type;
/**
* Id
*/
private String toId;
/**
* ID
*/
private String formId;
/**
*
*/
private String data;
}

View File

@ -0,0 +1,54 @@
package cn.stylefeng.roses.kernel.socket.websocket.operator;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
import cn.stylefeng.roses.kernel.socket.api.exception.enums.SocketExceptionEnum;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import java.util.Collection;
/**
* WebSocket
* <p>
* Spring bootSocketOperatorApisocketSpring
*
* @author majianguo
* @date 2021/6/2 10:41
*/
public class WebSocketOperator implements SocketOperatorApi {
@Override
public void sendMsgOfSession(String sessionId, Object msg) {
// 获取会话
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionById(sessionId);
if (ObjectUtil.isEmpty(socketSession)) {
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
}
// 发送内容
socketSession.getSocketOperatorApi().writeAndFlush(msg);
}
@Override
public void sendMsgOfAllSession(Object msg) {
// 获取所有会话
Collection<SocketSession<GettySocketOperator>> socketSessions = SessionCenter.getSocketSessionMap().values();
if (ObjectUtil.isNotEmpty(socketSessions)) {
// 给所有会话发送消息
for (SocketSession<?> socketSession : socketSessions) {
// 发送内容
socketSession.getSocketOperatorApi().writeAndFlush(msg);
}
}
}
@Override
public void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface) {
SocketMessageCenter.setMessageListener(msgType, callbackInterface);
}
}

View File

@ -0,0 +1,15 @@
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
import cn.stylefeng.roses.kernel.socket.api.session.SocketSessionOperatorApi;
/**
* Api
* <p>
* SocketOperatorApi
*
* @author majianguo
* @date 2021/6/1 3:44
*/
public interface GettyChannelExpandInterFace extends SocketSessionOperatorApi {
}

View File

@ -0,0 +1,56 @@
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
/**
* Socket
* <p>
* 使Getty,GettySocketChannel
*
* @author majianguo
* @date 2021/6/1 3:41
*/
public class GettySocketOperator implements GettyChannelExpandInterFace {
/**
*
*/
private SocketChannel socketChannel;
public GettySocketOperator(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void writeAndFlush(Object obj) {
if (obj instanceof String) {
// 处理WebSocket的数据
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(obj.toString());
socketChannel.writeAndFlush(textWebSocketFrame);
return;
}
socketChannel.writeAndFlush(obj);
}
@Override
public void writeToChannel(Object obj) {
if (obj instanceof String) {
// 处理WebSocket的数据
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(obj.toString());
socketChannel.writeToChannel(textWebSocketFrame);
return;
}
socketChannel.writeToChannel(obj);
}
@Override
public void close() {
socketChannel.close();
}
@Override
public boolean isInvalid() {
return socketChannel.isInvalid();
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.stylefeng.roses.kernel.socket.websocket.server;
import cn.stylefeng.roses.kernel.socket.websocket.server.handler.WebSocketMessageHandler;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.core.pipeline.ChannelInitializer;
import com.gettyio.core.pipeline.DefaultChannelPipeline;
import com.gettyio.expansion.handler.codec.websocket.WebSocketDecoder;
import com.gettyio.expansion.handler.codec.websocket.WebSocketEncoder;
/**
* WebSocket
*
* @author majianguo
* @date 2021/6/1 2:36
*/
public class WebSocketInitializer extends ChannelInitializer {
@Override
public void initChannel(SocketChannel channel) throws Exception {
// 获取责任链对象
DefaultChannelPipeline pipeline = channel.getDefaultChannelPipeline();
// 先把ws的编解码器添加到责任链前面。注意只有先通过ws的编解码器才能解析ws的消息帧
// 后续的解码器才能继续解析期望得到的结果
pipeline.addLast(new WebSocketEncoder());
pipeline.addLast(new WebSocketDecoder());
// 添加自定义的消息处理器
pipeline.addLast(new WebSocketMessageHandler());
}
}

View File

@ -0,0 +1,70 @@
package cn.stylefeng.roses.kernel.socket.websocket.server;
import com.gettyio.core.channel.config.ServerConfig;
import com.gettyio.core.channel.starter.AioServerStarter;
import java.net.StandardSocketOptions;
/**
* WebSocket
*
* @author majianguo
* @date 2021/6/1 2:40
*/
public class WebSocketServer {
/**
*
*
* @author majianguo
* @date 2021/6/2 11:10
**/
public static void start() {
// 初始化配置对象
ServerConfig aioServerConfig = new ServerConfig();
// 设置host,不设置默认localhost
aioServerConfig.setHost("0.0.0.0");
// 设置端口号
aioServerConfig.setPort(11130);
// 设置服务器端内存池最大可分配空间大小默认256mb内存池空间可以根据吞吐量设置。
// 尽量可以设置大一点,因为这不会真正的占用系统内存,只有真正使用时才会分配
aioServerConfig.setServerChunkSize(512 * 1024 * 1024);
// 设置数据输出器队列大小一般不用设置这个参数默认是10*1024*1024
aioServerConfig.setBufferWriterQueueSize(10 * 1024 * 1024);
// 设置读取缓存块大小一般不用设置这个参数默认128字节
aioServerConfig.setReadBufferSize(2048);
// 设置内存池等待分配内存的最大阻塞时间默认是1秒
aioServerConfig.setChunkPoolBlockTime(2000);
// 设置SocketOptions
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
// 启动
run(aioServerConfig);
}
/**
* Socket
*
* @param aioServerConfig
* @author majianguo
* @date 2021/6/1 2:40
**/
public static void run(ServerConfig aioServerConfig) {
final AioServerStarter starter = new AioServerStarter(aioServerConfig);
starter.channelInitializer(new WebSocketInitializer());
try {
// 启动服务
starter.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,98 @@
package cn.stylefeng.roses.kernel.socket.websocket.server.bind;
import com.gettyio.core.channel.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
*
*
* @author majianguo
* @date 2021/6/1 3:09
*/
public class ChannelIdAndUserBindCenter {
/**
*
*/
private static ConcurrentMap<String, String> channelIdAndUserBind = new ConcurrentHashMap<>();
/**
*
*/
private static List<SocketChannel> waitingBindList = Collections.synchronizedList(new ArrayList<>());
/**
* ID
*
* @param channelId ID
* @return {@link java.lang.String}
* @author majianguo
* @date 2021/6/1 3:33
**/
public static String getUserId(String channelId) {
return channelIdAndUserBind.get(channelId);
}
/**
*
*
* @param socketChannel
* @author majianguo
* @date 2021/6/1 3:17
**/
public static void addSocketChannel(SocketChannel socketChannel) {
waitingBindList.add(socketChannel);
}
/**
*
*
* @param channelId ID
* @param userId ID
* @return {@link boolean}
* @author majianguo
* @date 2021/6/1 3:21
**/
public static boolean bind(String channelId, String userId) {
Iterator<SocketChannel> iterator = waitingBindList.iterator();
while (iterator.hasNext()) {
SocketChannel item = iterator.next();
if (item.getChannelId().equals(channelId)) {
channelIdAndUserBind.put(channelId, userId);
iterator.remove();
return true;
}
}
return false;
}
/**
*
*
* @param userId ID
* @return {@link boolean}
* @author majianguo
* @date 2021/6/1 3:29
**/
public static boolean isBind(String userId) {
return channelIdAndUserBind.containsValue(userId);
}
/**
*
*
* @param channelId ID
* @author majianguo
* @date 2021/6/1 3:31
**/
public static void closed(String channelId) {
waitingBindList.removeIf(item -> item.getChannelId().equals(channelId));
channelIdAndUserBind.remove(channelId);
}
}

View File

@ -0,0 +1,86 @@
package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
import cn.stylefeng.roses.kernel.socket.websocket.server.bind.ChannelIdAndUserBindCenter;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
import com.alibaba.fastjson.JSON;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.core.pipeline.in.SimpleChannelInboundHandler;
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
import com.gettyio.expansion.handler.codec.websocket.frame.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
*
*
* @author majianguo
* @date 2021/6/1 2:35
*/
public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);
@Override
public void channelAdded(SocketChannel aioChannel) throws Exception {
log.info(aioChannel.getChannelId() + " connection successful.");
ChannelIdAndUserBindCenter.addSocketChannel(aioChannel);
}
@Override
public void channelClosed(SocketChannel aioChannel) throws Exception {
log.info(aioChannel.getChannelId() + " disconnected");
// 获取用户ID
String userId = ChannelIdAndUserBindCenter.getUserId(aioChannel.getChannelId());
if (ObjectUtil.isNotEmpty(userId)) {
// 根据用户ID关闭会话
SessionCenter.closed(userId);
}
ChannelIdAndUserBindCenter.closed(aioChannel.getChannelId());
}
@Override
public void channelRead0(SocketChannel socketChannel, WebSocketFrame webSocketFrame) throws Exception {
if (webSocketFrame instanceof TextWebSocketFrame) {
String data = new String(webSocketFrame.getPayloadData(), StandardCharsets.UTF_8);
// 转换为Java对象
WebSocketMessagePOJO webSocketMessagePOJO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessagePOJO.class);
// 维护通道和用户ID的绑定关系
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessagePOJO.getFormId())) {
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessagePOJO.getFormId());
// 创建api的会话对象
SocketSession<GettySocketOperator> socketSession = new SocketSession<>();
socketSession.setSessionId(webSocketMessagePOJO.getFormId());
socketSession.setSocketOperatorApi(new GettySocketOperator(socketChannel));
socketSession.setConnectionTime(System.currentTimeMillis());
socketSession.setLastActiveTime(System.currentTimeMillis());
// 维护会话
SessionCenter.addSocketSession(socketSession);
}
// 找到该消息的处理器
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessagePOJO.getType());
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
// 获取会话
SocketSession<GettySocketOperator> session = SessionCenter.getSessionById(webSocketMessagePOJO.getFormId());
// 触发回调
socketMsgCallbackInterface.callback(webSocketMessagePOJO.getType(), webSocketMessagePOJO, session);
} else {
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"code\":\"404\"}"));
}
}
}
}

View File

@ -0,0 +1,141 @@
package cn.stylefeng.roses.kernel.socket.websocket.session;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
*
* <p>
*
*
* @author majianguo
* @date 2021/6/1 1:43
*/
public class SessionCenter {
/**
*
*/
private static ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
/**
* ID
*/
private static ConcurrentMap<String, List<String>> messageTypeSessionMap = new ConcurrentHashMap<>();
/**
*
*
* @return {@link ConcurrentMap< String, SocketSession<GettySocketOperator>>}
* @author majianguo
* @date 2021/6/1 2:13
**/
public static ConcurrentMap<String, SocketSession<GettySocketOperator>> getSocketSessionMap() {
return socketSessionMap;
}
/**
* ID
*
* @return {@link ConcurrentMap< String, List< String>>}
* @author majianguo
* @date 2021/6/1 2:14
**/
public static ConcurrentMap<String, List<String>> getMessageTypeSessionMap() {
return messageTypeSessionMap;
}
/**
* ID
*
* @param sessionId ID
* @return {@link SocketSession <GettySocketOperator>}
* @author majianguo
* @date 2021/6/1 1:48
**/
public static SocketSession<GettySocketOperator> getSessionById(String sessionId) {
return socketSessionMap.get(sessionId);
}
/**
*
*
* @param socketSession
* @author majianguo
* @date 2021/6/1 1:49
**/
public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) {
// 维护会话
socketSessionMap.put(socketSession.getSessionId(), socketSession);
// 维护会话所有的消息类型和会话的关系
if (ObjectUtil.isNotEmpty(socketSession.getMessageTypes())) {
for (String messageType : socketSession.getMessageTypes()) {
List<String> sessionIds = messageTypeSessionMap.get(messageType);
if (ObjectUtil.isEmpty(sessionIds)) {
sessionIds = new ArrayList<>();
messageTypeSessionMap.put(messageType, sessionIds);
}
sessionIds.add(socketSession.getSessionId());
}
}
}
/**
*
*
* @return {@link List< SocketSession<GettySocketOperator>>}
* @author majianguo
* @date 2021/6/1 2:06
**/
public static List<SocketSession<GettySocketOperator>> getSocketSessionByMsgType(String msgType) {
List<SocketSession<GettySocketOperator>> res = new ArrayList<>();
// 获取监听该消息所有的会话
List<String> stringList = messageTypeSessionMap.get(msgType);
if (ObjectUtil.isNotEmpty(stringList)) {
for (String sessionId : stringList) {
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(sessionId);
res.add(socketSession);
}
}
return res;
}
/**
*
*
* @param msgType
* @param sessionId ID
* @author majianguo
* @date 2021/6/1 2:11
**/
public static void addSocketSessionMsgType(String msgType, String sessionId) {
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(sessionId);
if (ObjectUtil.isNotEmpty(socketSession)) {
socketSession.getMessageTypes().add(msgType);
}
}
/**
*
*
* @param sessionId
* @author majianguo
* @date 2021/6/1 3:25
**/
public static void closed(String sessionId) {
socketSessionMap.remove(sessionId);
for (Map.Entry<String, List<String>> stringListEntry : messageTypeSessionMap.entrySet()) {
stringListEntry.getValue().removeIf(item -> item.equals(sessionId));
}
}
}

View File

@ -0,0 +1 @@
Socket的spring boot自动加载模块

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-d-socket</artifactId>
<version>7.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>socket-spring-boot-starter</artifactId>
<packaging>jar</packaging>
<dependencies>
<!-- WebSocket模块 -->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>socket-business-websocket</artifactId>
<version>7.0.4</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,55 @@
/*
* Copyright [2020-2030] [https://www.stylefeng.cn]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* GunsAPACHE LICENSE 2.0使
*
* 1.LICENSE
* 2.Guns
* 3.
* 4. https://gitee.com/stylefeng/guns
* 5. https://gitee.com/stylefeng/guns
* 6.
*/
package cn.stylefeng.roses.kernel.socket.starter;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import cn.stylefeng.roses.kernel.socket.websocket.operator.WebSocketOperator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Socket
*
* @author fengshuonan
* @date 2020/12/1 21:18
*/
@Configuration
public class GunsSocketAutoConfiguration {
/**
* Socket
*
* @return {@link SocketOperatorApi}
* @author majianguo
* @date 2021/6/2 11:02
**/
@Bean
@ConditionalOnMissingBean(SocketOperatorApi.class)
public SocketOperatorApi socketOperatorApi() {
return new WebSocketOperator();
}
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.stylefeng.roses.kernel.socket.starter.GunsSocketAutoConfiguration

18
pom.xml
View File

@ -103,6 +103,9 @@
<!-- 分布式事务seata -->
<module>kernel-d-seata</module>
<!-- Socket模块 -->
<module>kernel-d-socket</module>
</modules>
<properties>
@ -133,6 +136,7 @@
<groovy.version>3.0.7</groovy.version>
<oshi.version>5.7.1</oshi.version>
<beetl.version>3.3.2.RELEASE</beetl.version>
<getty.version>1.4.9</getty.version>
</properties>
<dependencyManagement>
@ -292,6 +296,20 @@
<version>${oshi.version}</version>
</dependency>
<!--getty核心包-->
<dependency>
<groupId>com.gettyio</groupId>
<artifactId>getty-core</artifactId>
<version>${getty.version}</version>
</dependency>
<!-- 拓展包 -->
<dependency>
<groupId>com.gettyio</groupId>
<artifactId>getty-expansion</artifactId>
<version>${getty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>