mirror of https://gitee.com/stylefeng/roses
更改WebSocket默认实现为Spring Boot实现
parent
767fa4cb58
commit
88c14e119f
|
@ -38,13 +38,6 @@
|
|||
<artifactId>scanner-api</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--WebSocket模块的Api-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-api</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -32,7 +32,6 @@ import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleRoleInfo;
|
|||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleUserInfo;
|
||||
import cn.stylefeng.roses.kernel.rule.constants.RuleConstants;
|
||||
import cn.stylefeng.roses.kernel.scanner.api.annotation.field.ChineseDescription;
|
||||
import cn.stylefeng.roses.kernel.socket.api.expander.SocketConfigExpander;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
@ -174,7 +173,6 @@ public class LoginUser implements Serializable {
|
|||
|
||||
Map<String, Long> params = new HashMap<>(1);
|
||||
params.put("userId", this.userId);
|
||||
params.put("port", Long.valueOf(SocketConfigExpander.getSocketPort()));
|
||||
return StrUtil.format(this.wsUrl, params);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
<modules>
|
||||
<module>socket-api</module>
|
||||
<module>socket-sdk-websocket</module>
|
||||
<module>socket-business-websocket</module>
|
||||
<module>socket-spring-boot-starter</module>
|
||||
</modules>
|
||||
|
|
|
@ -14,11 +14,11 @@
|
|||
<artifactId>socket-business-websocket</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<!--Socket模块的Websocket实现-->
|
||||
|
||||
<!--Spring boot WebSocket实现-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-sdk-websocket</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!--web模块-->
|
||||
|
@ -26,6 +26,20 @@
|
|||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!--socket模块api-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-api</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--jwt模块的sdk-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>jwt-sdk</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,20 @@
|
|||
package cn.stylefeng.roses.kernel.socket.business.websocket.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||
|
||||
/**
|
||||
* 开启WebSocket支持
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/21 下午5:01
|
||||
*/
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
|
||||
@Bean
|
||||
public ServerEndpointExporter serverEndpointExporter() {
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
||||
package cn.stylefeng.roses.kernel.socket.business.websocket.message;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import java.util.HashMap;
|
|
@ -1,4 +1,4 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.operator;
|
||||
package cn.stylefeng.roses.kernel.socket.business.websocket.operator;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
|
@ -6,10 +6,10 @@ import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
|
|||
import cn.stylefeng.roses.kernel.socket.api.exception.enums.SocketExceptionEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.pojo.WebSocketMessageDTO;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.session.SessionCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel.GettySocketOperator;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
|
@ -1,4 +1,4 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
|
||||
package cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.SocketSessionOperatorApi;
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
package cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import javax.websocket.Session;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Socket操作类实现
|
||||
* <p>
|
||||
* 简单封装Spring Boot的默认WebSocket
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午3:41
|
||||
*/
|
||||
public class GettySocketOperator implements GettyChannelExpandInterFace {
|
||||
|
||||
/**
|
||||
* 实际操作的通道
|
||||
*/
|
||||
private Session socketChannel;
|
||||
|
||||
public GettySocketOperator(Session socketChannel) {
|
||||
this.socketChannel = socketChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAndFlush(Object obj) {
|
||||
try {
|
||||
socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToChannel(Object obj) {
|
||||
socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
socketChannel.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInvalid() {
|
||||
return socketChannel.isOpen();
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.pojo;
|
||||
package cn.stylefeng.roses.kernel.socket.business.websocket.pojo;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
import lombok.Data;
|
|
@ -0,0 +1,149 @@
|
|||
package cn.stylefeng.roses.kernel.socket.business.websocket.server;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.jwt.api.context.JwtContext;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel.GettySocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.pojo.WebSocketMessageDTO;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.session.SessionCenter;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.websocket.*;
|
||||
import javax.websocket.server.PathParam;
|
||||
import javax.websocket.server.ServerEndpoint;
|
||||
|
||||
/**
|
||||
* 消息监听处理器
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:35
|
||||
*/
|
||||
@Slf4j
|
||||
@ServerEndpoint(value = "/webSocket/{userId}")
|
||||
@Component
|
||||
public class WebSocketServer {
|
||||
|
||||
/**
|
||||
* 连接建立调用的方法
|
||||
* <p>
|
||||
* 暂时无用,需要在建立连接的时候做一些事情的话可以修改这里
|
||||
*
|
||||
* @param session 会话信息
|
||||
* @author majianguo
|
||||
* @date 2021/6/21 下午5:14
|
||||
**/
|
||||
@OnOpen
|
||||
public void onOpen(Session session, @PathParam("userId") String userId) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接关闭调用的方法
|
||||
*
|
||||
* @param session 会话信息
|
||||
* @author majianguo
|
||||
* @date 2021/6/21 下午5:14
|
||||
**/
|
||||
@OnClose
|
||||
public void onClose(Session session) {
|
||||
SessionCenter.closed(session.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到消息调用的方法
|
||||
*
|
||||
* @param message 接收到的消息
|
||||
* @param socketChannel 会话信息
|
||||
* @author majianguo
|
||||
* @date 2021/6/21 下午5:14
|
||||
**/
|
||||
@OnMessage
|
||||
public void onMessage(String message, Session socketChannel) {
|
||||
|
||||
// 转换为Java对象
|
||||
WebSocketMessageDTO WebSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class);
|
||||
|
||||
// 心跳包
|
||||
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||
// 更新会话最后活跃时间
|
||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(socketChannel.getId());
|
||||
if (ObjectUtil.isNotEmpty(session)) {
|
||||
session.setLastActiveTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
// 用户ID为空不处理直接跳过
|
||||
if (ObjectUtil.isEmpty(WebSocketMessageDTO.getFormUserId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 维护通道是否已初始化
|
||||
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId());
|
||||
if (ObjectUtil.isEmpty(socketSession) && ClientMessageTypeEnum.USER_CONNECTION_AUTHENTICATION.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||
// 操作api包装
|
||||
GettySocketOperator gettySocketOperator = new GettySocketOperator(socketChannel);
|
||||
|
||||
// 回复消息
|
||||
WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
|
||||
replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
|
||||
replyMsg.setToUserId(WebSocketMessageDTO.getFormUserId());
|
||||
|
||||
try {
|
||||
// 校验token是否合法
|
||||
JwtContext.me().validateTokenWithException(WebSocketMessageDTO.getData().toString());
|
||||
|
||||
// 设置回复内容
|
||||
replyMsg.setData(socketChannel.getId());
|
||||
|
||||
// 创建会话对象
|
||||
socketSession = new SocketSession<>();
|
||||
socketSession.setSessionId(socketChannel.getId());
|
||||
socketSession.setUserId(WebSocketMessageDTO.getFormUserId());
|
||||
socketSession.setSocketOperatorApi(gettySocketOperator);
|
||||
socketSession.setConnectionTime(System.currentTimeMillis());
|
||||
|
||||
// 维护会话
|
||||
SessionCenter.addSocketSession(socketSession);
|
||||
} finally {
|
||||
// 回复消息
|
||||
gettySocketOperator.writeAndFlush(replyMsg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 会话建立成功执行业务逻辑
|
||||
if (ObjectUtil.isNotEmpty(socketSession)) {
|
||||
|
||||
// 更新最后会话时间
|
||||
socketSession.setLastActiveTime(System.currentTimeMillis());
|
||||
|
||||
// 找到该消息的处理器
|
||||
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(WebSocketMessageDTO.getClientMsgType());
|
||||
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
||||
// 触发回调
|
||||
socketMsgCallbackInterface.callback(WebSocketMessageDTO.getClientMsgType(), WebSocketMessageDTO, socketSession);
|
||||
} else {
|
||||
socketChannel.getAsyncRemote().sendText("{\"serverMsgType\":\"404\"}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话发送异常调用的方法
|
||||
*
|
||||
* @param session 会话信息
|
||||
* @param error 错误信息
|
||||
* @author majianguo
|
||||
* @date 2021/6/21 下午5:14
|
||||
**/
|
||||
@OnError
|
||||
public void onError(Session session, Throwable error) {
|
||||
log.error("发生错误");
|
||||
error.printStackTrace();
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.session;
|
||||
package cn.stylefeng.roses.kernel.socket.business.websocket.session;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel.GettySocketOperator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
|
@ -1,45 +0,0 @@
|
|||
package cn.stylefeng.roses.kernel.socket.business.websocket.spring;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.expander.SocketConfigExpander;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer;
|
||||
import com.gettyio.core.channel.config.ServerConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.net.StandardSocketOptions;
|
||||
|
||||
/**
|
||||
* Spring Boot启动完成拉起WebSocket
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/2 上午11:06
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
// 初始化配置对象
|
||||
ServerConfig aioServerConfig = new ServerConfig();
|
||||
|
||||
// 设置host,默认0.0.0.0
|
||||
aioServerConfig.setHost(SocketConfigExpander.getSocketHost());
|
||||
|
||||
// 设置端口号,默认11130
|
||||
aioServerConfig.setPort(SocketConfigExpander.getSocketPort());
|
||||
|
||||
// 设置服务器端内存池最大可分配空间大小,默认512mb,内存池空间可以根据吞吐量设置。
|
||||
// 尽量可以设置大一点,因为这不会真正的占用系统内存,只有真正使用时才会分配
|
||||
aioServerConfig.setServerChunkSize(SocketConfigExpander.getSocketServerChunkSize());
|
||||
|
||||
// 设置SocketOptions
|
||||
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
|
||||
|
||||
// 启动
|
||||
WebSocketServer.run(aioServerConfig);
|
||||
|
||||
log.info("WebSocket Server Start Success!");
|
||||
}
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
socket模块的websocket实现
|
|
@ -1,31 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>kernel-d-socket</artifactId>
|
||||
<version>7.0.4</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>socket-sdk-websocket</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<!--短信模块的api-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-api</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--jwt模块的sdk-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>jwt-sdk</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,46 +0,0 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.gettyio.core.channel.SocketChannel;
|
||||
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
|
||||
|
||||
/**
|
||||
* Socket操作类实现
|
||||
* <p>
|
||||
* 这里使用的是Getty,所以对Getty的SocketChannel对象做简单封装
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午3:41
|
||||
*/
|
||||
public class GettySocketOperator implements GettyChannelExpandInterFace {
|
||||
|
||||
/**
|
||||
* 实际操作的通道
|
||||
*/
|
||||
private SocketChannel socketChannel;
|
||||
|
||||
public GettySocketOperator(SocketChannel socketChannel) {
|
||||
this.socketChannel = socketChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAndFlush(Object obj) {
|
||||
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(JSON.toJSONString(obj));
|
||||
socketChannel.writeAndFlush(textWebSocketFrame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToChannel(Object obj) {
|
||||
socketChannel.writeToChannel(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
socketChannel.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInvalid() {
|
||||
return socketChannel.isInvalid();
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.server;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.server.handler.WebSocketMessageHandler;
|
||||
import com.gettyio.core.channel.SocketChannel;
|
||||
import com.gettyio.core.pipeline.ChannelInitializer;
|
||||
import com.gettyio.core.pipeline.DefaultChannelPipeline;
|
||||
import com.gettyio.expansion.handler.codec.websocket.WebSocketDecoder;
|
||||
import com.gettyio.expansion.handler.codec.websocket.WebSocketEncoder;
|
||||
|
||||
/**
|
||||
* WebSocket通道责任链对象
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:36
|
||||
*/
|
||||
public class WebSocketInitializer extends ChannelInitializer {
|
||||
|
||||
@Override
|
||||
public void initChannel(SocketChannel channel) {
|
||||
// 获取责任链对象
|
||||
DefaultChannelPipeline pipeline = channel.getDefaultChannelPipeline();
|
||||
|
||||
// 先把ws的编解码器添加到责任链前面。注意,只有先通过ws的编解码器,才能解析ws的消息帧,
|
||||
// 后续的解码器才能继续解析期望得到的结果
|
||||
pipeline.addLast(new WebSocketEncoder());
|
||||
pipeline.addLast(new WebSocketDecoder());
|
||||
|
||||
// 添加自定义的消息处理器
|
||||
pipeline.addLast(new WebSocketMessageHandler());
|
||||
}
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.server;
|
||||
|
||||
import com.gettyio.core.channel.config.ServerConfig;
|
||||
import com.gettyio.core.channel.starter.AioServerStarter;
|
||||
|
||||
import java.net.StandardSocketOptions;
|
||||
|
||||
/**
|
||||
* WebSocket服务端
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:40
|
||||
*/
|
||||
public class WebSocketServer {
|
||||
|
||||
/**
|
||||
* 无参数启动(开发测试使用)
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/2 上午11:10
|
||||
**/
|
||||
public static void start() {
|
||||
// 初始化配置对象
|
||||
ServerConfig aioServerConfig = new ServerConfig();
|
||||
|
||||
// 设置host,不设置默认localhost
|
||||
aioServerConfig.setHost("0.0.0.0");
|
||||
|
||||
// 设置端口号
|
||||
aioServerConfig.setPort(11130);
|
||||
|
||||
// 设置服务器端内存池最大可分配空间大小,默认256mb,内存池空间可以根据吞吐量设置。
|
||||
// 尽量可以设置大一点,因为这不会真正的占用系统内存,只有真正使用时才会分配
|
||||
aioServerConfig.setServerChunkSize(512 * 1024 * 1024);
|
||||
|
||||
// 设置数据输出器队列大小,一般不用设置这个参数,默认是10*1024*1024
|
||||
aioServerConfig.setBufferWriterQueueSize(10 * 1024 * 1024);
|
||||
|
||||
// 设置读取缓存块大小,一般不用设置这个参数,默认128字节
|
||||
aioServerConfig.setReadBufferSize(2048);
|
||||
|
||||
// 设置内存池等待分配内存的最大阻塞时间,默认是1秒
|
||||
aioServerConfig.setChunkPoolBlockTime(2000);
|
||||
|
||||
// 设置SocketOptions
|
||||
aioServerConfig.setOption(StandardSocketOptions.SO_RCVBUF, 8192);
|
||||
|
||||
// 启动
|
||||
run(aioServerConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动Socket服务
|
||||
*
|
||||
* @param aioServerConfig 服务器配置
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:40
|
||||
**/
|
||||
public static void run(ServerConfig aioServerConfig) {
|
||||
final AioServerStarter starter = new AioServerStarter(aioServerConfig);
|
||||
starter.channelInitializer(new WebSocketInitializer());
|
||||
try {
|
||||
// 启动服务
|
||||
starter.start();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,116 +0,0 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.jwt.api.context.JwtContext;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.pojo.WebSocketMessageDTO;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.gettyio.core.channel.SocketChannel;
|
||||
import com.gettyio.core.pipeline.in.SimpleChannelInboundHandler;
|
||||
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
|
||||
import com.gettyio.expansion.handler.codec.websocket.frame.WebSocketFrame;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* 消息监听处理器
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:35
|
||||
*/
|
||||
@Slf4j
|
||||
public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
|
||||
|
||||
@Override
|
||||
public void channelAdded(SocketChannel aioChannel) {
|
||||
log.info(aioChannel.getChannelId() + " connection successful.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelClosed(SocketChannel aioChannel) {
|
||||
log.info(aioChannel.getChannelId() + " disconnected");
|
||||
SessionCenter.closed(aioChannel.getChannelId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(SocketChannel socketChannel, WebSocketFrame webSocketFrame) {
|
||||
|
||||
if (webSocketFrame instanceof TextWebSocketFrame) {
|
||||
String data = new String(webSocketFrame.getPayloadData(), StandardCharsets.UTF_8);
|
||||
|
||||
// 转换为Java对象
|
||||
WebSocketMessageDTO WebSocketMessageDTO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessageDTO.class);
|
||||
|
||||
// 心跳包
|
||||
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||
// 更新会话最后活跃时间
|
||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(socketChannel.getChannelId());
|
||||
if (ObjectUtil.isNotEmpty(session)) {
|
||||
session.setLastActiveTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
// 用户ID为空不处理直接跳过
|
||||
if (ObjectUtil.isEmpty(WebSocketMessageDTO.getFormUserId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 维护通道是否已初始化
|
||||
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getChannelId());
|
||||
if (ObjectUtil.isEmpty(socketSession) && ClientMessageTypeEnum.USER_CONNECTION_AUTHENTICATION.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
|
||||
// 操作api包装
|
||||
GettySocketOperator gettySocketOperator = new GettySocketOperator(socketChannel);
|
||||
|
||||
// 回复消息
|
||||
WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
|
||||
replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
|
||||
replyMsg.setToUserId(WebSocketMessageDTO.getFormUserId());
|
||||
|
||||
try {
|
||||
// 校验token是否合法
|
||||
JwtContext.me().validateTokenWithException(WebSocketMessageDTO.getData().toString());
|
||||
|
||||
// 设置回复内容
|
||||
replyMsg.setData(socketChannel.getChannelId());
|
||||
|
||||
// 创建会话对象
|
||||
socketSession = new SocketSession<>();
|
||||
socketSession.setSessionId(socketChannel.getChannelId());
|
||||
socketSession.setUserId(WebSocketMessageDTO.getFormUserId());
|
||||
socketSession.setSocketOperatorApi(gettySocketOperator);
|
||||
socketSession.setConnectionTime(System.currentTimeMillis());
|
||||
|
||||
// 维护会话
|
||||
SessionCenter.addSocketSession(socketSession);
|
||||
} finally {
|
||||
// 回复消息
|
||||
gettySocketOperator.writeAndFlush(replyMsg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 会话建立成功执行业务逻辑
|
||||
if (ObjectUtil.isNotEmpty(socketSession)) {
|
||||
|
||||
// 更新最后会话时间
|
||||
socketSession.setLastActiveTime(System.currentTimeMillis());
|
||||
|
||||
// 找到该消息的处理器
|
||||
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(WebSocketMessageDTO.getClientMsgType());
|
||||
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
||||
// 触发回调
|
||||
socketMsgCallbackInterface.callback(WebSocketMessageDTO.getClientMsgType(), WebSocketMessageDTO, socketSession);
|
||||
} else {
|
||||
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"serverMsgType\":\"404\"}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,7 +25,7 @@
|
|||
package cn.stylefeng.roses.kernel.socket.starter;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.WebSocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.business.websocket.operator.WebSocketOperator;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
|
Loading…
Reference in New Issue