mirror of https://gitee.com/stylefeng/roses
更新WebSocket操作API实现
parent
72cfc96ad5
commit
d5cd8bb072
|
@ -4,6 +4,8 @@ import lombok.Getter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 客户端消息类型枚举
|
* 客户端消息类型枚举
|
||||||
|
* <p>
|
||||||
|
* 用户根据业务动态新增一个监听的消息类型,监听后可收到该类型的消息推送
|
||||||
*
|
*
|
||||||
* @author majianguo
|
* @author majianguo
|
||||||
* @date 2021/6/3 上午9:14
|
* @date 2021/6/3 上午9:14
|
||||||
|
@ -13,8 +15,6 @@ public enum ClientMessageTypeEnum {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加用户监听的消息类型
|
* 添加用户监听的消息类型
|
||||||
* <p>
|
|
||||||
* 用户根据业务动态新增一个监听的消息类型,监听后可收到该类型的消息推送
|
|
||||||
*/
|
*/
|
||||||
USER_ADD_MSG_TYPE("200001", "用户添加一个监听的消息类型"),
|
USER_ADD_MSG_TYPE("200001", "用户添加一个监听的消息类型"),
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,17 @@
|
||||||
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
||||||
|
|
||||||
|
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WebSocket交互通用对象
|
* WebSocket交互通用对象
|
||||||
|
* <p>
|
||||||
|
* 特殊说明一下serverMsgType和clientMsgType的区别
|
||||||
|
* 1.serverMsgType字段是服务端发送给客户端的字段
|
||||||
|
* 例如:服务端发送一个系统消息(type:100001),客户端接收到该消息以后判断需不需要处理,不需要处理跳过即可
|
||||||
|
* 2.clientMsgType字段是客户端发送给服务器的字段
|
||||||
|
* 例如:客户端发送给服务器一个心跳消息(type:299999),服务端如果需要处理该消息就注册一个该消息的监听器,
|
||||||
|
* 那么收到消息服务端会把消息推送给对应的监听器,接口见{@link SocketOperatorApi#msgTypeCallback}
|
||||||
*
|
*
|
||||||
* @author majianguo
|
* @author majianguo
|
||||||
* @date 2021/6/1 下午2:56
|
* @date 2021/6/1 下午2:56
|
||||||
|
@ -12,9 +20,14 @@ import lombok.Data;
|
||||||
public class WebSocketMessagePOJO {
|
public class WebSocketMessagePOJO {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消息类型
|
* 服务端发送的消息类型(客户端如果需要监听该消息类型,注册对应的消息处理器即可)
|
||||||
*/
|
*/
|
||||||
private String type;
|
private String serverMsgType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端发送的消息类型(服务端需要处理的消息类型)
|
||||||
|
*/
|
||||||
|
private String clientMsgType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 目标Id
|
* 目标Id
|
||||||
|
|
|
@ -33,15 +33,11 @@ public class WebSocketOperator implements SocketOperatorApi {
|
||||||
if (ObjectUtil.isEmpty(socketSession)) {
|
if (ObjectUtil.isEmpty(socketSession)) {
|
||||||
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
||||||
// 判断用户是否监听
|
webSocketMessagePOJO.setData(msg);
|
||||||
if (socketSession.getMessageTypes().contains(msgType)) {
|
webSocketMessagePOJO.setServerMsgType(msgType);
|
||||||
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
// 发送内容
|
||||||
webSocketMessagePOJO.setData(msg);
|
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
||||||
webSocketMessagePOJO.setType(msgType);
|
|
||||||
// 发送内容
|
|
||||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -54,7 +50,7 @@ public class WebSocketOperator implements SocketOperatorApi {
|
||||||
for (SocketSession<GettySocketOperator> socketSession : socketSessionList) {
|
for (SocketSession<GettySocketOperator> socketSession : socketSessionList) {
|
||||||
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
||||||
webSocketMessagePOJO.setData(msg);
|
webSocketMessagePOJO.setData(msg);
|
||||||
webSocketMessagePOJO.setType(msgType);
|
webSocketMessagePOJO.setServerMsgType(msgType);
|
||||||
// 发送内容
|
// 发送内容
|
||||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
||||||
WebSocketMessagePOJO webSocketMessagePOJO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessagePOJO.class);
|
WebSocketMessagePOJO webSocketMessagePOJO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessagePOJO.class);
|
||||||
|
|
||||||
// 心跳包
|
// 心跳包
|
||||||
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessagePOJO.getType())) {
|
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessagePOJO.getClientMsgType())) {
|
||||||
// 更新用户最后活跃时间
|
// 更新用户最后活跃时间
|
||||||
String userId = ChannelIdAndUserBindCenter.getUserId(socketChannel.getChannelId());
|
String userId = ChannelIdAndUserBindCenter.getUserId(socketChannel.getChannelId());
|
||||||
if (ObjectUtil.isNotEmpty(userId)) {
|
if (ObjectUtil.isNotEmpty(userId)) {
|
||||||
|
@ -70,8 +70,6 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
||||||
|
|
||||||
// 用户ID为空不处理直接跳过
|
// 用户ID为空不处理直接跳过
|
||||||
if (ObjectUtil.isEmpty(webSocketMessagePOJO.getFormUserId())) {
|
if (ObjectUtil.isEmpty(webSocketMessagePOJO.getFormUserId())) {
|
||||||
ChannelIdAndUserBindCenter.closed(socketChannel.getChannelId());
|
|
||||||
socketChannel.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,13 +93,13 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
||||||
userSession.setLastActiveTime(System.currentTimeMillis());
|
userSession.setLastActiveTime(System.currentTimeMillis());
|
||||||
|
|
||||||
// 找到该消息的处理器
|
// 找到该消息的处理器
|
||||||
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessagePOJO.getType());
|
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessagePOJO.getClientMsgType());
|
||||||
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
||||||
// 获取会话
|
// 获取会话
|
||||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
|
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
|
||||||
|
|
||||||
// 触发回调
|
// 触发回调
|
||||||
socketMsgCallbackInterface.callback(webSocketMessagePOJO.getType(), webSocketMessagePOJO, session);
|
socketMsgCallbackInterface.callback(webSocketMessagePOJO.getClientMsgType(), webSocketMessagePOJO, session);
|
||||||
} else {
|
} else {
|
||||||
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"code\":\"404\"}"));
|
socketChannel.writeAndFlush(new TextWebSocketFrame("{\"code\":\"404\"}"));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue