1.修复消息通知BUG

2.更改WebSocket通信流程
pull/22/head
rays 2021-06-22 15:36:20 +08:00
parent 88c14e119f
commit e6f887294d
10 changed files with 127 additions and 65 deletions

View File

@ -1,5 +1,6 @@
package cn.stylefeng.roses.kernel.socket.api; package cn.stylefeng.roses.kernel.socket.api;
import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface; import cn.stylefeng.roses.kernel.socket.api.message.SocketMsgCallbackInterface;
/** /**
@ -21,7 +22,7 @@ public interface SocketOperatorApi {
* @author majianguo * @author majianguo
* @date 2021/6/11 2:19 * @date 2021/6/11 2:19
**/ **/
void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg); void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException;
/** /**
* *
@ -34,7 +35,7 @@ public interface SocketOperatorApi {
* @author majianguo * @author majianguo
* @date 2021/6/2 9:35 * @date 2021/6/2 9:35
**/ **/
void sendMsgOfUserSession(String msgType, String userId, Object msg); void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException;
/** /**
* *

View File

@ -25,7 +25,7 @@ import java.util.List;
public class WebSocketOperator implements SocketOperatorApi { public class WebSocketOperator implements SocketOperatorApi {
@Override @Override
public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) { public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException {
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(sessionId); SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(sessionId);
if (ObjectUtil.isEmpty(session)) { if (ObjectUtil.isEmpty(session)) {
throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST); throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
@ -37,7 +37,7 @@ public class WebSocketOperator implements SocketOperatorApi {
} }
@Override @Override
public void sendMsgOfUserSession(String msgType, String userId, Object msg) { public void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException {
// 根据用户ID获取会话 // 根据用户ID获取会话
List<SocketSession<GettySocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId); List<SocketSession<GettySocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId);
if (ObjectUtil.isEmpty(socketSessionList)) { if (ObjectUtil.isEmpty(socketSessionList)) {

View File

@ -26,7 +26,9 @@ public class GettySocketOperator implements GettyChannelExpandInterFace {
@Override @Override
public void writeAndFlush(Object obj) { public void writeAndFlush(Object obj) {
try { try {
socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj)); if (socketChannel.isOpen()) {
socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj));
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -34,13 +36,17 @@ public class GettySocketOperator implements GettyChannelExpandInterFace {
@Override @Override
public void writeToChannel(Object obj) { public void writeToChannel(Object obj) {
socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj)); if (socketChannel.isOpen()) {
socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj));
}
} }
@Override @Override
public void close() { public void close() {
try { try {
socketChannel.close(); if (socketChannel.isOpen()) {
socketChannel.close();
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -40,6 +40,31 @@ public class WebSocketServer {
**/ **/
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) { public void onOpen(Session session, @PathParam("userId") String userId) {
// 操作api包装
GettySocketOperator gettySocketOperator = new GettySocketOperator(session);
// 回复消息
WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
replyMsg.setToUserId(userId);
try {
// 设置回复内容
replyMsg.setData(session.getId());
// 创建会话对象
SocketSession<GettySocketOperator> socketSession = new SocketSession<>();
socketSession.setSessionId(session.getId());
socketSession.setUserId(userId);
socketSession.setSocketOperatorApi(gettySocketOperator);
socketSession.setConnectionTime(System.currentTimeMillis());
// 维护会话
SessionCenter.addSocketSession(socketSession);
} finally {
// 回复消息
gettySocketOperator.writeAndFlush(replyMsg);
}
} }
/** /**
@ -68,13 +93,16 @@ public class WebSocketServer {
// 转换为Java对象 // 转换为Java对象
WebSocketMessageDTO WebSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class); WebSocketMessageDTO WebSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class);
// 维护通道是否已初始化
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId());
// 心跳包 // 心跳包
if (ClientMessageTypeEnum.USER_HEART.getCode().equals(WebSocketMessageDTO.getClientMsgType())) { if (ObjectUtil.isNotEmpty(socketSession) && ClientMessageTypeEnum.USER_HEART.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
// 更新会话最后活跃时间 // 更新会话最后活跃时间
SocketSession<GettySocketOperator> session = SessionCenter.getSessionBySessionId(socketChannel.getId()); if (ObjectUtil.isNotEmpty(socketSession)) {
if (ObjectUtil.isNotEmpty(session)) { socketSession.setLastActiveTime(System.currentTimeMillis());
session.setLastActiveTime(System.currentTimeMillis());
} }
return;
} }
// 用户ID为空不处理直接跳过 // 用户ID为空不处理直接跳过
@ -82,40 +110,6 @@ public class WebSocketServer {
return; return;
} }
// 维护通道是否已初始化
SocketSession<GettySocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId());
if (ObjectUtil.isEmpty(socketSession) && ClientMessageTypeEnum.USER_CONNECTION_AUTHENTICATION.getCode().equals(WebSocketMessageDTO.getClientMsgType())) {
// 操作api包装
GettySocketOperator gettySocketOperator = new GettySocketOperator(socketChannel);
// 回复消息
WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
replyMsg.setToUserId(WebSocketMessageDTO.getFormUserId());
try {
// 校验token是否合法
JwtContext.me().validateTokenWithException(WebSocketMessageDTO.getData().toString());
// 设置回复内容
replyMsg.setData(socketChannel.getId());
// 创建会话对象
socketSession = new SocketSession<>();
socketSession.setSessionId(socketChannel.getId());
socketSession.setUserId(WebSocketMessageDTO.getFormUserId());
socketSession.setSocketOperatorApi(gettySocketOperator);
socketSession.setConnectionTime(System.currentTimeMillis());
// 维护会话
SessionCenter.addSocketSession(socketSession);
} finally {
// 回复消息
gettySocketOperator.writeAndFlush(replyMsg);
}
return;
}
// 会话建立成功执行业务逻辑 // 会话建立成功执行业务逻辑
if (ObjectUtil.isNotEmpty(socketSession)) { if (ObjectUtil.isNotEmpty(socketSession)) {
@ -143,7 +137,6 @@ public class WebSocketServer {
**/ **/
@OnError @OnError
public void onError(Session session, Throwable error) { public void onError(Session session, Throwable error) {
log.error("发生错误"); log.error("session 发生错误:" + session.getId());
error.printStackTrace();
} }
} }

View File

@ -4,8 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession; import cn.stylefeng.roses.kernel.socket.api.session.pojo.SocketSession;
import cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel.GettySocketOperator; import cn.stylefeng.roses.kernel.socket.business.websocket.operator.channel.GettySocketOperator;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -102,9 +101,13 @@ public class SessionCenter {
* @date 2021/6/1 3:25 * @date 2021/6/1 3:25
**/ **/
public static void closed(String sessionId) { public static void closed(String sessionId) {
for (List<SocketSession<GettySocketOperator>> values : socketSessionMap.values()) { Set<Map.Entry<String, List<SocketSession<GettySocketOperator>>>> entrySet = socketSessionMap.entrySet();
if (ObjectUtil.isNotEmpty(values)) { Iterator<Map.Entry<String, List<SocketSession<GettySocketOperator>>>> iterator = entrySet.iterator();
values.removeIf(item -> item.getSessionId().equals(sessionId)); while (iterator.hasNext()) {
Map.Entry<String, List<SocketSession<GettySocketOperator>>> next = iterator.next();
List<SocketSession<GettySocketOperator>> value = next.getValue();
if (ObjectUtil.isNotEmpty(value)) {
value.removeIf(gettySocketOperatorSocketSession -> gettySocketOperatorSocketSession.getSessionId().equals(sessionId));
} }
} }
} }

View File

@ -40,7 +40,7 @@
<!--引入WebSocket模块--> <!--引入WebSocket模块-->
<dependency> <dependency>
<groupId>cn.stylefeng.roses</groupId> <groupId>cn.stylefeng.roses</groupId>
<artifactId>socket-sdk-websocket</artifactId> <artifactId>socket-api</artifactId>
<version>${roses.version}</version> <version>${roses.version}</version>
</dependency> </dependency>

View File

@ -43,6 +43,7 @@ import cn.stylefeng.roses.kernel.message.db.service.SysMessageService;
import cn.stylefeng.roses.kernel.rule.enums.YesOrNotEnum; import cn.stylefeng.roses.kernel.rule.enums.YesOrNotEnum;
import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi; import cn.stylefeng.roses.kernel.socket.api.SocketOperatorApi;
import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum; import cn.stylefeng.roses.kernel.socket.api.enums.ServerMessageTypeEnum;
import cn.stylefeng.roses.kernel.socket.api.exception.SocketException;
import cn.stylefeng.roses.kernel.system.api.UserServiceApi; import cn.stylefeng.roses.kernel.system.api.UserServiceApi;
import cn.stylefeng.roses.kernel.system.api.pojo.user.request.SysUserRequest; import cn.stylefeng.roses.kernel.system.api.pojo.user.request.SysUserRequest;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@ -98,23 +99,28 @@ public class MessageDbServiceImpl implements MessageApi {
} }
Set<Long> userIdSet = new HashSet<>(userIds); Set<Long> userIdSet = new HashSet<>(userIds);
SysMessage sysMessage = new SysMessage(); for (Long userId : userIdSet) {
BeanUtil.copyProperties(messageSendRequest, sysMessage);
// 初始化默认值
sysMessage.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
sysMessage.setSendUserId(loginUser.getUserId());
userIdSet.forEach(userId -> {
// 判断用户是否存在 // 判断用户是否存在
if (userServiceApi.userExist(userId)) { if (userServiceApi.userExist(userId)) {
SysMessage sysMessage = new SysMessage();
BeanUtil.copyProperties(messageSendRequest, sysMessage);
// 初始化默认值
sysMessage.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
sysMessage.setSendUserId(loginUser.getUserId());
sysMessage.setReceiveUserId(userId); sysMessage.setReceiveUserId(userId);
sendMsgList.add(sysMessage); sendMsgList.add(sysMessage);
} }
}); }
sysMessageService.saveBatch(sendMsgList); sysMessageService.saveBatch(sendMsgList);
// 给用户发送通知 // 给用户发送通知
for (SysMessage item : sendMsgList) { for (SysMessage item : sendMsgList) {
socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode(), item.getReceiveUserId().toString(), item); try {
socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode(), item.getReceiveUserId().toString(), item);
} catch (SocketException socketException) {
// 该用户不在线
}
} }
} }

View File

@ -25,6 +25,7 @@
package cn.stylefeng.roses.kernel.system.api; package cn.stylefeng.roses.kernel.system.api;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* api * api
@ -52,4 +53,14 @@ public interface MenuServiceApi {
*/ */
List<String> getUserAppCodeList(); List<String> getUserAppCodeList();
/**
* ID
*
* @param menuIds
* @return {@link java.util.Set<java.lang.Long>}
* @author majianguo
* @date 2021/6/22 10:11
**/
Set<Long> getMenuAllParentMenuId(Set<Long> menuIds);
} }

View File

@ -32,6 +32,7 @@ import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser; import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleRoleInfo; import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleRoleInfo;
import cn.stylefeng.roses.kernel.db.api.DbOperatorApi; import cn.stylefeng.roses.kernel.db.api.DbOperatorApi;
import cn.stylefeng.roses.kernel.rule.constants.RuleConstants;
import cn.stylefeng.roses.kernel.rule.constants.SymbolConstant; import cn.stylefeng.roses.kernel.rule.constants.SymbolConstant;
import cn.stylefeng.roses.kernel.rule.constants.TreeConstants; import cn.stylefeng.roses.kernel.rule.constants.TreeConstants;
import cn.stylefeng.roses.kernel.rule.enums.StatusEnum; import cn.stylefeng.roses.kernel.rule.enums.StatusEnum;
@ -407,9 +408,7 @@ public class SysMenuServiceImpl extends ServiceImpl<SysMenuMapper, SysMenu> impl
// 菜单查询条件 // 菜单查询条件
LambdaQueryWrapper<SysMenu> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<SysMenu> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SysMenu::getStatusFlag, StatusEnum.ENABLE.getCode()) queryWrapper.eq(SysMenu::getStatusFlag, StatusEnum.ENABLE.getCode()).eq(SysMenu::getDelFlag, YesOrNotEnum.N.getCode()).orderByAsc(SysMenu::getMenuSort);
.eq(SysMenu::getDelFlag, YesOrNotEnum.N.getCode())
.orderByAsc(SysMenu::getMenuSort);
// 如果应用编码不为空,则拼接应用编码 // 如果应用编码不为空,则拼接应用编码
if (StrUtil.isNotBlank(appCode)) { if (StrUtil.isNotBlank(appCode)) {
@ -463,6 +462,28 @@ public class SysMenuServiceImpl extends ServiceImpl<SysMenuMapper, SysMenu> impl
return list.stream().map(SysMenu::getAppCode).collect(Collectors.toList()); return list.stream().map(SysMenu::getAppCode).collect(Collectors.toList());
} }
@Override
public Set<Long> getMenuAllParentMenuId(Set<Long> menuIds) {
Set<Long> parentMenuIds = new HashSet<>();
// 查询所有菜单信息
List<SysMenu> sysMenus = this.listByIds(menuIds);
if (ObjectUtil.isEmpty(sysMenus)) {
return parentMenuIds;
}
// 获取所有父菜单ID
for (SysMenu sysMenu : sysMenus) {
String menuPids = sysMenu.getMenuPids().replaceAll("\\[", "").replaceAll("\\]", "");
String[] ids = menuPids.split(SymbolConstant.COMMA);
for (String id : ids) {
parentMenuIds.add(Long.parseLong(id));
}
}
return parentMenuIds;
}
/** /**
* *
* *

View File

@ -40,6 +40,7 @@ import cn.stylefeng.roses.kernel.rule.enums.StatusEnum;
import cn.stylefeng.roses.kernel.rule.enums.YesOrNotEnum; import cn.stylefeng.roses.kernel.rule.enums.YesOrNotEnum;
import cn.stylefeng.roses.kernel.rule.exception.base.ServiceException; import cn.stylefeng.roses.kernel.rule.exception.base.ServiceException;
import cn.stylefeng.roses.kernel.rule.pojo.dict.SimpleDict; import cn.stylefeng.roses.kernel.rule.pojo.dict.SimpleDict;
import cn.stylefeng.roses.kernel.system.api.MenuServiceApi;
import cn.stylefeng.roses.kernel.system.api.UserServiceApi; import cn.stylefeng.roses.kernel.system.api.UserServiceApi;
import cn.stylefeng.roses.kernel.system.api.constants.SystemConstants; import cn.stylefeng.roses.kernel.system.api.constants.SystemConstants;
import cn.stylefeng.roses.kernel.system.api.exception.SystemModularException; import cn.stylefeng.roses.kernel.system.api.exception.SystemModularException;
@ -62,6 +63,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -91,6 +93,9 @@ public class SysRoleServiceImpl extends ServiceImpl<SysRoleMapper, SysRole> impl
@Resource @Resource
private SysRoleMenuButtonService sysRoleMenuButtonService; private SysRoleMenuButtonService sysRoleMenuButtonService;
@Resource
private MenuServiceApi menuServiceApi;
@Override @Override
public void add(SysRoleRequest sysRoleRequest) { public void add(SysRoleRequest sysRoleRequest) {
@ -219,9 +224,25 @@ public class SysRoleServiceImpl extends ServiceImpl<SysRoleMapper, SysRole> impl
List<Long> menuIdList = sysRoleMenuButtonRequest.getGrantMenuIdList(); List<Long> menuIdList = sysRoleMenuButtonRequest.getGrantMenuIdList();
if (ObjectUtil.isNotEmpty(menuIdList)) { if (ObjectUtil.isNotEmpty(menuIdList)) {
List<SysRoleMenu> sysRoleMenus = new ArrayList<>(); List<SysRoleMenu> sysRoleMenus = new ArrayList<>();
// 角色ID
Long roleId = sysRoleMenuButtonRequest.getRoleId();
// 查询菜单的所有父菜单
Set<Long> allParentMenuId = menuServiceApi.getMenuAllParentMenuId(new HashSet<>(menuIdList));
// 处理所有父菜单
for (Long menuId : allParentMenuId) {
SysRoleMenu item = new SysRoleMenu();
item.setRoleId(roleId);
item.setMenuId(menuId);
sysRoleMenus.add(item);
}
// 处理菜单本身
for (Long menuId : menuIdList) { for (Long menuId : menuIdList) {
SysRoleMenu item = new SysRoleMenu(); SysRoleMenu item = new SysRoleMenu();
item.setRoleId(sysRoleMenuButtonRequest.getRoleId()); item.setRoleId(roleId);
item.setMenuId(menuId); item.setMenuId(menuId);
sysRoleMenus.add(item); sysRoleMenus.add(item);
} }