mirror of https://gitee.com/stylefeng/roses
parent
7858b13c29
commit
0fe4cbcd6b
|
@ -38,4 +38,10 @@ public interface SymbolConstant {
|
|||
|
||||
String RIGHT_SQUARE_BRACKETS = "]";
|
||||
|
||||
String DOLLAR = "$";
|
||||
|
||||
String PERCENT = "%";
|
||||
|
||||
String AND = "&";
|
||||
|
||||
}
|
||||
|
|
|
@ -39,6 +39,12 @@
|
|||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--WebSocket模块的Api-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-api</artifactId>
|
||||
<version>${roses.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -29,6 +29,7 @@ import cn.hutool.core.util.StrUtil;
|
|||
import cn.stylefeng.roses.kernel.auth.api.enums.DataScopeTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleRoleInfo;
|
||||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleUserInfo;
|
||||
import cn.stylefeng.roses.kernel.config.api.context.ConfigContext;
|
||||
import cn.stylefeng.roses.kernel.rule.constants.RuleConstants;
|
||||
import cn.stylefeng.roses.kernel.scanner.api.annotation.field.ChineseDescription;
|
||||
import lombok.Data;
|
||||
|
@ -37,6 +38,8 @@ import java.io.Serializable;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants.SOCKET_PORT;
|
||||
|
||||
/**
|
||||
* 登录用户信息
|
||||
*
|
||||
|
@ -162,9 +165,11 @@ public class LoginUser implements Serializable {
|
|||
|
||||
public String getWsUrl() {
|
||||
AtomicReference<String> returnUrl = new AtomicReference<>(StrUtil.EMPTY);
|
||||
Integer socketPort = ConfigContext.me().getSysConfigValueWithDefault(SOCKET_PORT, Integer.class, 11130);
|
||||
Optional.ofNullable(this.wsUrl).ifPresent(url -> {
|
||||
Map<String, Long> user = new HashMap<>(1);
|
||||
user.put("userId", this.userId);
|
||||
user.put("port", Long.valueOf(socketPort));
|
||||
returnUrl.set(StrUtil.format(url, user));
|
||||
});
|
||||
return returnUrl.get();
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package cn.stylefeng.roses.kernel.socket.api;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
|
||||
/**
|
||||
* Socket通用操作类
|
||||
* <p>
|
||||
|
@ -13,12 +16,12 @@ public interface SocketOperatorApi {
|
|||
/**
|
||||
* 发送消息到指定会话
|
||||
*
|
||||
* @param sessionId 会话ID(会话ID具体看业务的实现,WebSocket中使用的是用户传入的formId)
|
||||
* @param userId 用户ID
|
||||
* @param msg 消息体
|
||||
* @author majianguo
|
||||
* @date 2021/6/2 上午9:35
|
||||
**/
|
||||
void sendMsgOfSession(String sessionId, Object msg);
|
||||
void sendMsgOfUserSession(ServerMessageTypeEnum msgType, String userId, Object msg);
|
||||
|
||||
/**
|
||||
* 发送消息到所有会话
|
||||
|
@ -27,7 +30,7 @@ public interface SocketOperatorApi {
|
|||
* @author majianguo
|
||||
* @date 2021/6/2 上午9:35
|
||||
**/
|
||||
void sendMsgOfAllSession(Object msg);
|
||||
void sendMsgOfAllUserSession(ServerMessageTypeEnum msgType, Object msg);
|
||||
|
||||
/**
|
||||
* 监听指定类型消息
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package cn.stylefeng.roses.kernel.socket.api.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 客户端消息类型枚举
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/3 上午9:14
|
||||
*/
|
||||
@Getter
|
||||
public enum ClientMessageTypeEnum {
|
||||
|
||||
/**
|
||||
* 添加用户监听的消息类型
|
||||
* <p>
|
||||
* 用户根据业务动态新增一个监听的消息类型,监听后可收到该类型的消息推送
|
||||
*/
|
||||
USER_ADD_MSG_TYPE("200001", "用户添加一个监听的消息类型"),
|
||||
|
||||
/**
|
||||
* 用户心跳
|
||||
*/
|
||||
USER_HEART("299999", "用户心跳"),
|
||||
;
|
||||
|
||||
private final String code;
|
||||
|
||||
private final String name;
|
||||
|
||||
ClientMessageTypeEnum(String code, String name) {
|
||||
this.code = code;
|
||||
this.name = name;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package cn.stylefeng.roses.kernel.socket.api.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 服务端消息类型枚举
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/3 上午9:14
|
||||
*/
|
||||
@Getter
|
||||
public enum ServerMessageTypeEnum {
|
||||
|
||||
/**
|
||||
* 系统通知消息类型
|
||||
*/
|
||||
SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型"),
|
||||
;
|
||||
|
||||
private final String code;
|
||||
|
||||
private final String name;
|
||||
|
||||
ServerMessageTypeEnum(String code, String name) {
|
||||
this.code = code;
|
||||
this.name = name;
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package cn.stylefeng.roses.kernel.socket.api;
|
||||
package cn.stylefeng.roses.kernel.socket.api.message;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
|
|
@ -28,7 +28,7 @@ package cn.stylefeng.roses.kernel.socket.api.session;
|
|||
* socket会话操作接口
|
||||
* <p>
|
||||
* 该接口面向会话,须基于会话的通道调用。
|
||||
* 该接口支持扩展,可参考WebSocket模块中{@link cn.stylefeng.roses.kernel.socket.websocket.channel}包下的类
|
||||
* 该接口支持扩展,可参考WebSocket模块中{@link cn.stylefeng.roses.kernel.socket.websocket.operator.channel}包下的类
|
||||
*
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 上午11:46
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.stylefeng.roses.kernel.socket.api.session.pojo;
|
|||
import cn.stylefeng.roses.kernel.socket.api.session.SocketSessionOperatorApi;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -17,12 +18,12 @@ public class SocketSession<T extends SocketSessionOperatorApi> {
|
|||
/**
|
||||
* 会话唯一标识
|
||||
*/
|
||||
private String sessionId;
|
||||
private String userId;
|
||||
|
||||
/**
|
||||
* 该会话所有的监听消息类型
|
||||
*/
|
||||
private Set<String> messageTypes;
|
||||
private Set<String> messageTypes = new HashSet<>();
|
||||
|
||||
/**
|
||||
* 连接时间
|
||||
|
|
|
@ -1,17 +1,28 @@
|
|||
package cn.stylefeng.roses.kernel.socket.business.websocket.spring;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.config.api.constants.ConfigConstants;
|
||||
import cn.stylefeng.roses.kernel.config.api.context.ConfigContext;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.server.WebSocketServer;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||
import com.gettyio.core.channel.config.ServerConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants.*;
|
||||
import static cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum.*;
|
||||
|
||||
/**
|
||||
* Spring Boot启动完成拉起WebSocket
|
||||
|
@ -23,6 +34,9 @@ import static cn.stylefeng.roses.kernel.socket.api.constants.SocketConstants.*;
|
|||
@Slf4j
|
||||
public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
|
||||
|
||||
@Autowired
|
||||
private SocketOperatorApi socketOperatorApi;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
// 初始化配置对象
|
||||
|
@ -48,5 +62,14 @@ public class WebSocketApplicationRunnerImpl implements ApplicationRunner {
|
|||
WebSocketServer.run(aioServerConfig);
|
||||
|
||||
log.info("WebSocket Server Start Success!");
|
||||
|
||||
// 添加用户新增消息类型的回调
|
||||
socketOperatorApi.msgTypeCallback(USER_ADD_MSG_TYPE.getCode(), (msgType, msg, socketSession) -> {
|
||||
// 转换对象
|
||||
WebSocketMessagePOJO webSocketMessage = (WebSocketMessagePOJO)msg;
|
||||
|
||||
// 维护会话中心的消息类型
|
||||
SessionCenter.addSocketSessionMsgType(webSocketMessage.getData().toString(), webSocketMessage.getFormUserId());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.message;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -19,15 +19,15 @@ public class WebSocketMessagePOJO {
|
|||
/**
|
||||
* 目标Id
|
||||
*/
|
||||
private String toId;
|
||||
private String toUserId;
|
||||
|
||||
/**
|
||||
* 发送者ID
|
||||
*/
|
||||
private String formId;
|
||||
private String formUserId;
|
||||
|
||||
/**
|
||||
* 数据
|
||||
*/
|
||||
private String data;
|
||||
private Object data;
|
||||
}
|
||||
|
|
|
@ -1,16 +1,20 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.operator;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
|
||||
import cn.stylefeng.roses.kernel.socket.api.exception.enums.SocketExceptionEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* WebSocket操作实现类
|
||||
|
@ -23,26 +27,36 @@ import java.util.Collection;
|
|||
public class WebSocketOperator implements SocketOperatorApi {
|
||||
|
||||
@Override
|
||||
public void sendMsgOfSession(String sessionId, Object msg) {
|
||||
// 获取会话
|
||||
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionById(sessionId);
|
||||
public void sendMsgOfUserSession(ServerMessageTypeEnum msgType, String userId, Object msg) {
|
||||
// 根据用户ID获取会话
|
||||
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionByUserId(userId);
|
||||
if (ObjectUtil.isEmpty(socketSession)) {
|
||||
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
|
||||
}
|
||||
|
||||
// 判断用户是否监听
|
||||
if (socketSession.getMessageTypes().contains(msgType.getCode())) {
|
||||
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
||||
webSocketMessagePOJO.setData(msg);
|
||||
webSocketMessagePOJO.setType(msgType.getCode());
|
||||
// 发送内容
|
||||
socketSession.getSocketOperatorApi().writeAndFlush(msg);
|
||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMsgOfAllSession(Object msg) {
|
||||
// 获取所有会话
|
||||
Collection<SocketSession<GettySocketOperator>> socketSessions = SessionCenter.getSocketSessionMap().values();
|
||||
if (ObjectUtil.isNotEmpty(socketSessions)) {
|
||||
public void sendMsgOfAllUserSession(ServerMessageTypeEnum msgType, Object msg) {
|
||||
// 获取监听该消息类型的所有会话
|
||||
List<SocketSession<GettySocketOperator>> socketSessionList = SessionCenter.getSocketSessionByMsgType(msgType.getCode());
|
||||
|
||||
if (ObjectUtil.isNotEmpty(socketSessionList)) {
|
||||
// 给所有会话发送消息
|
||||
for (SocketSession<?> socketSession : socketSessions) {
|
||||
for (SocketSession<GettySocketOperator> socketSession : socketSessionList) {
|
||||
WebSocketMessagePOJO webSocketMessagePOJO = new WebSocketMessagePOJO();
|
||||
webSocketMessagePOJO.setData(msg);
|
||||
webSocketMessagePOJO.setType(msgType.getCode());
|
||||
// 发送内容
|
||||
socketSession.getSocketOperatorApi().writeAndFlush(msg);
|
||||
socketSession.getSocketOperatorApi().writeAndFlush(webSocketMessagePOJO);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.operator.channel;
|
||||
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.WebSocketMessagePOJO;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.gettyio.core.channel.SocketChannel;
|
||||
import com.gettyio.expansion.handler.codec.websocket.frame.TextWebSocketFrame;
|
||||
|
||||
|
@ -24,23 +26,12 @@ public class GettySocketOperator implements GettyChannelExpandInterFace {
|
|||
|
||||
@Override
|
||||
public void writeAndFlush(Object obj) {
|
||||
if (obj instanceof String) {
|
||||
// 处理WebSocket的数据
|
||||
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(obj.toString());
|
||||
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(JSON.toJSONString(obj));
|
||||
socketChannel.writeAndFlush(textWebSocketFrame);
|
||||
return;
|
||||
}
|
||||
socketChannel.writeAndFlush(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToChannel(Object obj) {
|
||||
if (obj instanceof String) {
|
||||
// 处理WebSocket的数据
|
||||
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(obj.toString());
|
||||
socketChannel.writeToChannel(textWebSocketFrame);
|
||||
return;
|
||||
}
|
||||
socketChannel.writeToChannel(obj);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package cn.stylefeng.roses.kernel.socket.websocket.server.handler;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.rule.constants.RuleConstants;
|
||||
import cn.stylefeng.roses.kernel.rule.constants.SymbolConstant;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ClientMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.message.SocketMessageCenter;
|
||||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.session.SessionCenter;
|
||||
|
@ -55,13 +58,30 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
|||
// 转换为Java对象
|
||||
WebSocketMessagePOJO webSocketMessagePOJO = JSON.toJavaObject(JSON.parseObject(data), WebSocketMessagePOJO.class);
|
||||
|
||||
// 心跳包
|
||||
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessagePOJO.getType())) {
|
||||
// 更新用户最后活跃时间
|
||||
String userId = ChannelIdAndUserBindCenter.getUserId(socketChannel.getChannelId());
|
||||
if (ObjectUtil.isNotEmpty(userId)) {
|
||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(userId);
|
||||
session.setLastActiveTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
// 用户ID为空不处理直接跳过
|
||||
if (ObjectUtil.isEmpty(webSocketMessagePOJO.getFormUserId())) {
|
||||
ChannelIdAndUserBindCenter.closed(socketChannel.getChannelId());
|
||||
socketChannel.close();
|
||||
return;
|
||||
}
|
||||
|
||||
// 维护通道和用户ID的绑定关系
|
||||
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessagePOJO.getFormId())) {
|
||||
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessagePOJO.getFormId());
|
||||
if (!ChannelIdAndUserBindCenter.isBind(webSocketMessagePOJO.getFormUserId())) {
|
||||
ChannelIdAndUserBindCenter.bind(socketChannel.getChannelId(), webSocketMessagePOJO.getFormUserId());
|
||||
|
||||
// 创建api的会话对象
|
||||
SocketSession<GettySocketOperator> socketSession = new SocketSession<>();
|
||||
socketSession.setSessionId(webSocketMessagePOJO.getFormId());
|
||||
socketSession.setUserId(webSocketMessagePOJO.getFormUserId());
|
||||
socketSession.setSocketOperatorApi(new GettySocketOperator(socketChannel));
|
||||
socketSession.setConnectionTime(System.currentTimeMillis());
|
||||
socketSession.setLastActiveTime(System.currentTimeMillis());
|
||||
|
@ -70,11 +90,15 @@ public class WebSocketMessageHandler extends SimpleChannelInboundHandler<WebSock
|
|||
SessionCenter.addSocketSession(socketSession);
|
||||
}
|
||||
|
||||
// 更新最后会话时间
|
||||
SocketSession<GettySocketOperator> userSession = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
|
||||
userSession.setLastActiveTime(System.currentTimeMillis());
|
||||
|
||||
// 找到该消息的处理器
|
||||
SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessagePOJO.getType());
|
||||
if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
|
||||
// 获取会话
|
||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionById(webSocketMessagePOJO.getFormId());
|
||||
SocketSession<GettySocketOperator> session = SessionCenter.getSessionByUserId(webSocketMessagePOJO.getFormUserId());
|
||||
|
||||
// 触发回调
|
||||
socketMsgCallbackInterface.callback(webSocketMessagePOJO.getType(), webSocketMessagePOJO, session);
|
||||
|
|
|
@ -4,9 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
|
|||
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
|
||||
import cn.stylefeng.roses.kernel.socket.websocket.operator.channel.GettySocketOperator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -26,9 +24,9 @@ public class SessionCenter {
|
|||
private static ConcurrentMap<String, SocketSession<GettySocketOperator>> socketSessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 消息类型和会话ID关系维护
|
||||
* 消息类型和用户ID关系维护
|
||||
*/
|
||||
private static ConcurrentMap<String, List<String>> messageTypeSessionMap = new ConcurrentHashMap<>();
|
||||
private static ConcurrentMap<String, Set<String>> messageTypeSessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 获取维护的所有会话
|
||||
|
@ -42,26 +40,26 @@ public class SessionCenter {
|
|||
}
|
||||
|
||||
/**
|
||||
* 获取消息和会话ID的完整映射关系
|
||||
* 获取消息和用户ID的完整映射关系
|
||||
*
|
||||
* @return {@link ConcurrentMap< String, List< String>>}
|
||||
* @return {@link ConcurrentMap< String, Set<String>>}
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:14
|
||||
**/
|
||||
public static ConcurrentMap<String, List<String>> getMessageTypeSessionMap() {
|
||||
public static ConcurrentMap<String, Set<String>> getMessageTypeSessionMap() {
|
||||
return messageTypeSessionMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据会话ID获取会话详情
|
||||
* 根据用户ID获取会话详情
|
||||
*
|
||||
* @param sessionId 会话ID
|
||||
* @param userId 用户ID
|
||||
* @return {@link SocketSession <GettySocketOperator>}
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午1:48
|
||||
**/
|
||||
public static SocketSession<GettySocketOperator> getSessionById(String sessionId) {
|
||||
return socketSessionMap.get(sessionId);
|
||||
public static SocketSession<GettySocketOperator> getSessionByUserId(String userId) {
|
||||
return socketSessionMap.get(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,18 +70,19 @@ public class SessionCenter {
|
|||
* @date 2021/6/1 下午1:49
|
||||
**/
|
||||
public static void addSocketSession(SocketSession<GettySocketOperator> socketSession) {
|
||||
|
||||
// 维护会话
|
||||
socketSessionMap.put(socketSession.getSessionId(), socketSession);
|
||||
socketSessionMap.put(socketSession.getUserId(), socketSession);
|
||||
|
||||
// 维护会话所有的消息类型和会话的关系
|
||||
if (ObjectUtil.isNotEmpty(socketSession.getMessageTypes())) {
|
||||
for (String messageType : socketSession.getMessageTypes()) {
|
||||
List<String> sessionIds = messageTypeSessionMap.get(messageType);
|
||||
if (ObjectUtil.isEmpty(sessionIds)) {
|
||||
sessionIds = new ArrayList<>();
|
||||
messageTypeSessionMap.put(messageType, sessionIds);
|
||||
Set<String> userIds = messageTypeSessionMap.get(messageType);
|
||||
if (ObjectUtil.isEmpty(userIds)) {
|
||||
userIds = new HashSet<>();
|
||||
messageTypeSessionMap.put(messageType, userIds);
|
||||
}
|
||||
sessionIds.add(socketSession.getSessionId());
|
||||
userIds.add(socketSession.getUserId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,10 +98,10 @@ public class SessionCenter {
|
|||
List<SocketSession<GettySocketOperator>> res = new ArrayList<>();
|
||||
|
||||
// 获取监听该消息所有的会话
|
||||
List<String> stringList = messageTypeSessionMap.get(msgType);
|
||||
if (ObjectUtil.isNotEmpty(stringList)) {
|
||||
for (String sessionId : stringList) {
|
||||
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(sessionId);
|
||||
Set<String> userIds = messageTypeSessionMap.get(msgType);
|
||||
if (ObjectUtil.isNotEmpty(userIds)) {
|
||||
for (String userId : userIds) {
|
||||
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(userId);
|
||||
res.add(socketSession);
|
||||
}
|
||||
}
|
||||
|
@ -114,28 +113,36 @@ public class SessionCenter {
|
|||
* 给会话添加监听的消息类型
|
||||
*
|
||||
* @param msgType 消息类型
|
||||
* @param sessionId 会话ID
|
||||
* @param userId 用户ID
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午2:11
|
||||
**/
|
||||
public static void addSocketSessionMsgType(String msgType, String sessionId) {
|
||||
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(sessionId);
|
||||
public static void addSocketSessionMsgType(String msgType, String userId) {
|
||||
// 维护Session信息
|
||||
SocketSession<GettySocketOperator> socketSession = socketSessionMap.get(userId);
|
||||
if (ObjectUtil.isNotEmpty(socketSession)) {
|
||||
socketSession.getMessageTypes().add(msgType);
|
||||
}
|
||||
// 维护消息列表
|
||||
Set<String> userIds = messageTypeSessionMap.get(msgType);
|
||||
if (ObjectUtil.isEmpty(userIds)) {
|
||||
userIds = new HashSet<>();
|
||||
messageTypeSessionMap.put(msgType, userIds);
|
||||
}
|
||||
userIds.add(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接关闭
|
||||
*
|
||||
* @param sessionId 会话唯一标识
|
||||
* @param userId 用户ID
|
||||
* @author majianguo
|
||||
* @date 2021/6/1 下午3:25
|
||||
**/
|
||||
public static void closed(String sessionId) {
|
||||
socketSessionMap.remove(sessionId);
|
||||
for (Map.Entry<String, List<String>> stringListEntry : messageTypeSessionMap.entrySet()) {
|
||||
stringListEntry.getValue().removeIf(item -> item.equals(sessionId));
|
||||
public static void closed(String userId) {
|
||||
socketSessionMap.remove(userId);
|
||||
for (Map.Entry<String, Set<String>> stringListEntry : messageTypeSessionMap.entrySet()) {
|
||||
stringListEntry.getValue().removeIf(item -> item.equals(userId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,8 +33,8 @@ import org.springframework.context.annotation.Configuration;
|
|||
/**
|
||||
* Socket的自动配置类
|
||||
*
|
||||
* @author fengshuonan
|
||||
* @date 2020/12/1 21:18
|
||||
* @author majianguo
|
||||
* @date 2021/6/2 下午5:48
|
||||
*/
|
||||
@Configuration
|
||||
public class GunsSocketAutoConfiguration {
|
||||
|
@ -42,9 +42,9 @@ public class GunsSocketAutoConfiguration {
|
|||
/**
|
||||
* Socket操作实现类
|
||||
*
|
||||
* @return {@link SocketOperatorApi}
|
||||
* @return {@link cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi}
|
||||
* @author majianguo
|
||||
* @date 2021/6/2 上午11:02
|
||||
* @date 2021/6/2 下午5:48
|
||||
**/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(SocketOperatorApi.class)
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Copyright [2020-2030] [https://www.stylefeng.cn]
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Guns采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
|
||||
*
|
||||
* 1.请不要删除和修改根目录下的LICENSE文件。
|
||||
* 2.请不要删除和修改Guns源码头部的版权声明。
|
||||
* 3.请保留源码和相关描述文件的项目出处,作者声明等。
|
||||
* 4.分发源码时候,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 5.在修改包名,模块名称,项目代码等时,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 6.若您的项目无法满足以上几点,可申请商业授权
|
||||
*/
|
||||
package cn.stylefeng.roses.kernel.message.api;
|
||||
|
||||
import cn.stylefeng.roses.kernel.message.api.pojo.request.MessageSendRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 系统消息websocket相关接口
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/26 18:14
|
||||
*/
|
||||
public interface WebsocketApi {
|
||||
|
||||
/**
|
||||
* 发送websocket系统消息
|
||||
*
|
||||
* @param userIdList userId 集合
|
||||
* @param messageSendRequest 系统消息参数
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/26 18:17
|
||||
*/
|
||||
void sendWebSocketMessage(List<Long> userIdList, MessageSendRequest messageSendRequest);
|
||||
|
||||
}
|
|
@ -81,6 +81,11 @@ public class MessageSendRequest extends BaseRequest {
|
|||
@NotBlank(message = "业务类型不能为空", groups = {add.class, edit.class})
|
||||
private String businessType;
|
||||
|
||||
/**
|
||||
* 业务类型值
|
||||
*/
|
||||
private String businessTypeValue;
|
||||
|
||||
/**
|
||||
* 消息发送时间
|
||||
*/
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
websocket业务模块
|
|
@ -1,51 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>kernel-s-message</artifactId>
|
||||
<version>7.0.4</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>message-business-websocket</artifactId>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<!--auth鉴权模块的api-->
|
||||
<!--需要用auth模块的LoginContext获取当前登录用户,然后返回ws-url-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>auth-api</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--资源api模块-->
|
||||
<!--用在资源控制器,资源扫描上-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>scanner-api</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--参数校验模块-->
|
||||
<!--用在控制器,参数校验-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>validator-api</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--web模块-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Copyright [2020-2030] [https://www.stylefeng.cn]
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Guns采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
|
||||
*
|
||||
* 1.请不要删除和修改根目录下的LICENSE文件。
|
||||
* 2.请不要删除和修改Guns源码头部的版权声明。
|
||||
* 3.请保留源码和相关描述文件的项目出处,作者声明等。
|
||||
* 4.分发源码时候,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 5.在修改包名,模块名称,项目代码等时,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 6.若您的项目无法满足以上几点,可申请商业授权
|
||||
*/
|
||||
package cn.stylefeng.roses.kernel.message.modular.websocket;
|
||||
|
||||
import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
|
||||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
|
||||
import cn.stylefeng.roses.kernel.rule.pojo.response.ResponseData;
|
||||
import cn.stylefeng.roses.kernel.rule.pojo.response.SuccessResponseData;
|
||||
import cn.stylefeng.roses.kernel.scanner.api.annotation.ApiResource;
|
||||
import cn.stylefeng.roses.kernel.scanner.api.annotation.GetResource;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* websocket控制器
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/2/3 21:08
|
||||
*/
|
||||
@RestController
|
||||
@ApiResource(name = "webSocket控制器")
|
||||
public class WebSocketController {
|
||||
|
||||
/**
|
||||
* 获取登录用户ws-url
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/2/3 21:15
|
||||
*/
|
||||
@GetResource(name = "获取登录用户ws-url", path = "/webSocket/getWsUrl")
|
||||
public ResponseData getWsUrl() {
|
||||
LoginUser loginUser = LoginContext.me().getLoginUser();
|
||||
return new SuccessResponseData(loginUser.getWsUrl());
|
||||
}
|
||||
|
||||
}
|
|
@ -37,6 +37,13 @@
|
|||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--引入WebSocket模块-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>socket-sdk-websocket</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -32,7 +32,6 @@ import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
|
|||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
|
||||
import cn.stylefeng.roses.kernel.db.api.pojo.page.PageResult;
|
||||
import cn.stylefeng.roses.kernel.message.api.MessageApi;
|
||||
import cn.stylefeng.roses.kernel.message.api.WebsocketApi;
|
||||
import cn.stylefeng.roses.kernel.message.api.constants.MessageConstants;
|
||||
import cn.stylefeng.roses.kernel.message.api.enums.MessageReadFlagEnum;
|
||||
import cn.stylefeng.roses.kernel.message.api.exception.MessageException;
|
||||
|
@ -43,8 +42,11 @@ import cn.stylefeng.roses.kernel.message.api.pojo.response.MessageResponse;
|
|||
import cn.stylefeng.roses.kernel.message.db.entity.SysMessage;
|
||||
import cn.stylefeng.roses.kernel.message.db.service.SysMessageService;
|
||||
import cn.stylefeng.roses.kernel.rule.enums.YesOrNotEnum;
|
||||
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
|
||||
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
|
||||
import cn.stylefeng.roses.kernel.system.api.UserServiceApi;
|
||||
import cn.stylefeng.roses.kernel.system.api.pojo.user.request.SysUserRequest;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -65,7 +67,7 @@ import java.util.stream.Collectors;
|
|||
public class MessageDbServiceImpl implements MessageApi {
|
||||
|
||||
@Resource
|
||||
private WebsocketApi websocketApi;
|
||||
private SocketOperatorApi socketOperatorApi;
|
||||
|
||||
@Resource
|
||||
private UserServiceApi userServiceApi;
|
||||
|
@ -111,7 +113,10 @@ public class MessageDbServiceImpl implements MessageApi {
|
|||
}
|
||||
});
|
||||
|
||||
websocketApi.sendWebSocketMessage(ListUtil.toList(userIdSet), messageSendRequest);
|
||||
// 给用户发送通知
|
||||
for (Long userId : userIdSet) {
|
||||
socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE, userId.toString(), messageSendRequest);
|
||||
}
|
||||
sysMessageService.saveBatch(sendMsgList);
|
||||
|
||||
}
|
||||
|
@ -134,10 +139,7 @@ public class MessageDbServiceImpl implements MessageApi {
|
|||
LoginUser loginUser = LoginContext.me().getLoginUser();
|
||||
Long userId = loginUser.getUserId();
|
||||
LambdaUpdateWrapper<SysMessage> updateWrapper = new LambdaUpdateWrapper<>();
|
||||
updateWrapper.set(SysMessage::getReadFlag, MessageReadFlagEnum.READ.getCode())
|
||||
.eq(SysMessage::getReadFlag, MessageReadFlagEnum.UNREAD.getCode())
|
||||
.eq(SysMessage::getReceiveUserId, userId)
|
||||
.set(SysMessage::getDelFlag, YesOrNotEnum.N.getCode());
|
||||
updateWrapper.set(SysMessage::getReadFlag, MessageReadFlagEnum.READ.getCode()).eq(SysMessage::getReadFlag, MessageReadFlagEnum.UNREAD.getCode()).eq(SysMessage::getReceiveUserId, userId).set(SysMessage::getDelFlag, YesOrNotEnum.N.getCode());
|
||||
sysMessageService.update(updateWrapper);
|
||||
|
||||
}
|
||||
|
@ -156,8 +158,7 @@ public class MessageDbServiceImpl implements MessageApi {
|
|||
public void deleteByMessageId(Long messageId) {
|
||||
LambdaUpdateWrapper<SysMessage> updateWrapper = new LambdaUpdateWrapper<>();
|
||||
// 修改为逻辑删除
|
||||
updateWrapper.eq(SysMessage::getMessageId, messageId)
|
||||
.set(SysMessage::getDelFlag, YesOrNotEnum.Y.getCode());
|
||||
updateWrapper.eq(SysMessage::getMessageId, messageId).set(SysMessage::getDelFlag, YesOrNotEnum.Y.getCode());
|
||||
sysMessageService.update(updateWrapper);
|
||||
}
|
||||
|
||||
|
@ -165,8 +166,7 @@ public class MessageDbServiceImpl implements MessageApi {
|
|||
@Transactional(rollbackFor = Exception.class)
|
||||
public void batchDeleteByMessageIds(String messageIds) {
|
||||
LambdaUpdateWrapper<SysMessage> updateWrapper = new LambdaUpdateWrapper<>();
|
||||
updateWrapper.inSql(SysMessage::getMessageId, messageIds)
|
||||
.set(SysMessage::getDelFlag, YesOrNotEnum.Y.getCode());
|
||||
updateWrapper.inSql(SysMessage::getMessageId, messageIds).set(SysMessage::getDelFlag, YesOrNotEnum.Y.getCode());
|
||||
sysMessageService.update(updateWrapper);
|
||||
}
|
||||
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
系统消息websocket的sdk,用于将消息发送给在线用户,并提供相应接口
|
|
@ -1,41 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>kernel-s-message</artifactId>
|
||||
<version>7.0.4</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>message-sdk-websocket</artifactId>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!--消息模块的api-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>message-api</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--websocket 依赖-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>auth-api</artifactId>
|
||||
<version>7.0.4</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -1,79 +0,0 @@
|
|||
/*
|
||||
* Copyright [2020-2030] [https://www.stylefeng.cn]
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Guns采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
|
||||
*
|
||||
* 1.请不要删除和修改根目录下的LICENSE文件。
|
||||
* 2.请不要删除和修改Guns源码头部的版权声明。
|
||||
* 3.请保留源码和相关描述文件的项目出处,作者声明等。
|
||||
* 4.分发源码时候,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 5.在修改包名,模块名称,项目代码等时,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 6.若您的项目无法满足以上几点,可申请商业授权
|
||||
*/
|
||||
package cn.stylefeng.roses.kernel.message.websocket;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
|
||||
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
|
||||
import cn.stylefeng.roses.kernel.message.api.WebsocketApi;
|
||||
import cn.stylefeng.roses.kernel.message.api.enums.MessageReadFlagEnum;
|
||||
import cn.stylefeng.roses.kernel.message.api.pojo.request.MessageSendRequest;
|
||||
import cn.stylefeng.roses.kernel.message.api.pojo.response.MessageResponse;
|
||||
import cn.stylefeng.roses.kernel.message.websocket.manager.WebSocketManager;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 系统消息websocket
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/2 22:00
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class WebSocketServiceImpl implements WebsocketApi {
|
||||
|
||||
|
||||
public final static ObjectMapper MAPPER;
|
||||
|
||||
static {
|
||||
MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendWebSocketMessage(List<Long> userIdList, MessageSendRequest messageSendRequest) {
|
||||
// 获取当前登录人
|
||||
LoginUser loginUser = LoginContext.me().getLoginUser();
|
||||
try {
|
||||
MessageResponse sysMessage = new MessageResponse();
|
||||
BeanUtil.copyProperties(messageSendRequest, sysMessage);
|
||||
sysMessage.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
|
||||
sysMessage.setSendUserId(loginUser.getUserId());
|
||||
String msgInfo = MAPPER.writeValueAsString(sysMessage);
|
||||
|
||||
for (Long userId : userIdList) {
|
||||
WebSocketManager.sendMessage(userId, msgInfo);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("发送websocket异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,129 +0,0 @@
|
|||
/*
|
||||
* Copyright [2020-2030] [https://www.stylefeng.cn]
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Guns采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
|
||||
*
|
||||
* 1.请不要删除和修改根目录下的LICENSE文件。
|
||||
* 2.请不要删除和修改Guns源码头部的版权声明。
|
||||
* 3.请保留源码和相关描述文件的项目出处,作者声明等。
|
||||
* 4.分发源码时候,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 5.在修改包名,模块名称,项目代码等时,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 6.若您的项目无法满足以上几点,可申请商业授权
|
||||
*/
|
||||
package cn.stylefeng.roses.kernel.message.websocket.manager;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.websocket.Session;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* websocket客户端连接管理
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:08
|
||||
*/
|
||||
public class WebSocketManager {
|
||||
|
||||
private static final ConcurrentHashMap<Long, List<Session>> userIdSessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 添加用户ID相关的Session
|
||||
*
|
||||
* @param userId 用户id
|
||||
* @param session 用户websocketSession
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:08
|
||||
*/
|
||||
public static void add(Long userId, Session session) {
|
||||
userIdSessionMap.computeIfAbsent(userId, v -> new ArrayList<>()).add(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据用户ID获取Session
|
||||
*
|
||||
* @param userId 用户id
|
||||
* @return List<Session> 用户websocketSession集合
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:10
|
||||
*/
|
||||
public static List<Session> getSessionByUserId(Long userId) {
|
||||
return userIdSessionMap.get(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除失效的Session
|
||||
*
|
||||
* @param userId 用户id
|
||||
* @param session 用户websocketSession
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:11
|
||||
*/
|
||||
public static void removeSession(Long userId, Session session) {
|
||||
if (session == null) {
|
||||
return;
|
||||
}
|
||||
List<Session> webSessoin = userIdSessionMap.get(userId);
|
||||
if (webSessoin == null || CollectionUtils.isEmpty(webSessoin)) {
|
||||
return;
|
||||
}
|
||||
webSessoin.remove(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取链接用户集合
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:11
|
||||
*/
|
||||
public static Set<Long> getUserList() {
|
||||
return userIdSessionMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息
|
||||
*
|
||||
* @param userId 用户id
|
||||
* @param message 消息
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:11
|
||||
*/
|
||||
public static void sendMessage(Long userId, String message) {
|
||||
List<Session> sessionList = getSessionByUserId(userId);
|
||||
// 增加判断不为空
|
||||
if (!CollectionUtils.isEmpty(sessionList)) {
|
||||
for (Session userSession : sessionList) {
|
||||
userSession.getAsyncRemote().sendText(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息
|
||||
*
|
||||
* @param message 消息
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:11
|
||||
*/
|
||||
public static void sendMessageToAll(String message) {
|
||||
for (Long userId : WebSocketManager.getUserList()) {
|
||||
sendMessage(userId, message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Copyright [2020-2030] [https://www.stylefeng.cn]
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Guns采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
|
||||
*
|
||||
* 1.请不要删除和修改根目录下的LICENSE文件。
|
||||
* 2.请不要删除和修改Guns源码头部的版权声明。
|
||||
* 3.请保留源码和相关描述文件的项目出处,作者声明等。
|
||||
* 4.分发源码时候,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 5.在修改包名,模块名称,项目代码等时,请注明软件出处 https://gitee.com/stylefeng/guns
|
||||
* 6.若您的项目无法满足以上几点,可申请商业授权
|
||||
*/
|
||||
package cn.stylefeng.roses.kernel.message.websocket.server;
|
||||
|
||||
import cn.stylefeng.roses.kernel.message.websocket.manager.WebSocketManager;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.websocket.*;
|
||||
import javax.websocket.server.PathParam;
|
||||
import javax.websocket.server.ServerEndpoint;
|
||||
|
||||
/**
|
||||
* websocket服务端
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:26
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ServerEndpoint("/message/websocket/{userId}")
|
||||
public class WebSocketEndpoint {
|
||||
|
||||
/**
|
||||
* 连接建立成功后调用
|
||||
*
|
||||
* @param userId 用户id
|
||||
* @param session 用户websocketSession
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:27
|
||||
*/
|
||||
@OnOpen
|
||||
public void onOpen(@PathParam(value = "userId") Long userId, Session session) {
|
||||
|
||||
// 添加到链接管理
|
||||
WebSocketManager.add(userId, session);
|
||||
|
||||
// 返回消息
|
||||
session.getAsyncRemote().sendText("WebSocket连接成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接关闭时调用
|
||||
*
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:29
|
||||
*/
|
||||
@OnClose
|
||||
public void onClose(@PathParam(value = "userId") Long userId, Session session) {
|
||||
// 从map中删除
|
||||
WebSocketManager.removeSession(userId, session);
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到客户端消息后调用
|
||||
*
|
||||
* @param message 客户端发送过来的消息
|
||||
* @param session 用户websocketSession
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:29
|
||||
*/
|
||||
@OnMessage
|
||||
public void onMessage(String message, Session session) {
|
||||
log.debug("来自客户端的消息:" + message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生错误时回调
|
||||
*
|
||||
* @param session 用户信息
|
||||
* @param error 错误
|
||||
* @author liuhanqing
|
||||
* @date 2021/1/24 22:29
|
||||
*/
|
||||
@OnError
|
||||
public void onError(Session session, Throwable error) {
|
||||
log.error("WebSocket发生错误");
|
||||
if (log.isDebugEnabled()) {
|
||||
error.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -24,13 +24,6 @@
|
|||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--websocket管理-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>message-business-websocket</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<!--消息默认记录到库中-->
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
|
@ -38,12 +31,6 @@
|
|||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.stylefeng.roses</groupId>
|
||||
<artifactId>message-sdk-websocket</artifactId>
|
||||
<version>7.0.4</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -24,11 +24,7 @@
|
|||
*/
|
||||
package cn.stylefeng.roses.kernel.message.starter;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||
|
||||
/**
|
||||
* 系统消息的自动配置
|
||||
|
@ -39,20 +35,4 @@ import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
|||
@Configuration
|
||||
public class GunsMessageAutoConfiguration {
|
||||
|
||||
public static final String WEB_SOCKET_PREFIX = "web-socket";
|
||||
|
||||
/**
|
||||
* 开启WebSocket功能
|
||||
*
|
||||
* @return serverEndpointExporter
|
||||
* @author liuhanqing
|
||||
* @date 2021/01/24 22:09
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(ServerEndpointExporter.class)
|
||||
@ConditionalOnProperty(prefix = WEB_SOCKET_PREFIX, name = "open", havingValue = "true")
|
||||
public ServerEndpointExporter serverEndpointExporter() {
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,9 +18,7 @@
|
|||
<modules>
|
||||
<module>message-api</module>
|
||||
<module>message-business</module>
|
||||
<module>message-business-websocket</module>
|
||||
<module>message-sdk-db</module>
|
||||
<module>message-sdk-websocket</module>
|
||||
<module>message-spring-boot-starter</module>
|
||||
</modules>
|
||||
|
||||
|
|
|
@ -194,6 +194,7 @@ public class SysNoticeServiceImpl extends ServiceImpl<SysNoticeMapper, SysNotice
|
|||
|
||||
// 消息业务类型
|
||||
message.setBusinessType(MessageBusinessTypeEnum.SYS_NOTICE.getCode());
|
||||
message.setBusinessTypeValue(MessageBusinessTypeEnum.SYS_NOTICE.getName());
|
||||
|
||||
message.setBusinessId(sysNotice.getNoticeId());
|
||||
message.setMessageSendTime(new Date());
|
||||
|
|
|
@ -68,6 +68,9 @@ public class IndexService {
|
|||
// 获取登录用户ws-url
|
||||
renderMap.put("wsUrl", loginUser.getWsUrl());
|
||||
|
||||
// 获取登录用户ID
|
||||
renderMap.put("userId", loginUser.getUserId());
|
||||
|
||||
// 未读消息数量
|
||||
MessageRequest messageRequest = new MessageRequest();
|
||||
messageRequest.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
|
||||
|
|
Loading…
Reference in New Issue