Merge remote-tracking branch 'origin/dev-websocket'

pull/3/head
fengshuonan 2021-01-27 21:18:26 +08:00
commit 3103083eac
20 changed files with 470 additions and 11 deletions

View File

@ -1,15 +1,15 @@
package cn.stylefeng.roses.kernel.auth.api.pojo.login;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.StrUtil;
import cn.stylefeng.roses.kernel.auth.api.enums.DataScopeTypeEnum;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleRoleInfo;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.basic.SimpleUserInfo;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
/**
*
@ -89,4 +89,20 @@ public class LoginUser implements Serializable {
*/
private Dict otherInfos;
/**
* ws-url
*/
private String wsUrl;
public String getWsUrl(){
AtomicReference<String> returnUrl = new AtomicReference<>(StrUtil.EMPTY);
Optional.ofNullable(this.wsUrl).ifPresent(url -> {
Map<String, Long> user = new HashMap<>(1);
user.put("userId", this.userId);
returnUrl.set(StrUtil.format(url, user));
});
return returnUrl.get();
}
}

View File

@ -79,6 +79,14 @@
<scope>provided</scope>
</dependency>
<!--系统消息业务模块的api-->
<!--获取当前登录用户的ws-url-->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>message-api</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>

View File

@ -16,6 +16,7 @@ import cn.stylefeng.roses.kernel.jwt.api.context.JwtContext;
import cn.stylefeng.roses.kernel.jwt.api.exception.JwtException;
import cn.stylefeng.roses.kernel.jwt.api.exception.enums.JwtExceptionEnum;
import cn.stylefeng.roses.kernel.jwt.api.pojo.payload.DefaultJwtPayload;
import cn.stylefeng.roses.kernel.message.api.expander.WebSocketConfigExpander;
import cn.stylefeng.roses.kernel.rule.util.HttpServletUtil;
import cn.stylefeng.roses.kernel.system.LoginLogServiceApi;
import cn.stylefeng.roses.kernel.system.UserServiceApi;
@ -213,6 +214,9 @@ public class AuthServiceImpl implements AuthServiceApi {
synchronized (SESSION_OPERATE_LOCK) {
// 8.1 获取ws-url 保存到用户信息中
loginUser.setWsUrl(WebSocketConfigExpander.getWebSocketWsUrl());
// 9. 缓存用户信息,创建会话
sessionManagerApi.createSession(jwtToken, loginUser);

View File

@ -37,26 +37,38 @@ public class FlywayInitListener implements ApplicationListener<ApplicationContex
String dataSourcePassword = environment.getProperty("spring.datasource.password");
// flyway的配置
String enabledStr = environment.getProperty("spring.flyway.enabled");
String locations = environment.getProperty("spring.flyway.locations");
String baselineOnMigrateStr = environment.getProperty("spring.flyway.baseline-on-migrate");
String outOfOrderStr = environment.getProperty("spring.flyway.out-of-order");
// 是否开启flyway默认false.
boolean enabled = false;
if (StrUtil.isNotBlank(enabledStr)) {
enabled = Boolean.parseBoolean(enabledStr);
}
// 如果未开启flyway 直接return
if (!enabled) {
return;
}
// 如果有为空的配置,终止执行
if (ObjectUtil.hasEmpty(dataSourceUrl, dataSourceUsername, dataSourcePassword)) {
throw new DaoException(FlywayExceptionEnum.DB_CONFIG_ERROR);
}
// 如果未设置flyway路径则设置为默认flyway路径
if (StrUtil.isBlank(locations)) {
locations = FLYWAY_LOCATIONS;
}
// 当迁移时发现目标schema非空而且带有没有元数据的表时是否自动执行基准迁移默认false.
boolean baselineOnMigrate = false;
if (StrUtil.isNotBlank(baselineOnMigrateStr)) {
baselineOnMigrate = Boolean.parseBoolean(baselineOnMigrateStr);
}
// 如果未设置flyway路径则设置为默认flyway路径
if (StrUtil.isBlank(locations)) {
locations = FLYWAY_LOCATIONS;
}
// 是否允许无序的迁移 开发环境最好开启, 生产环境关闭
boolean outOfOrder = false;
if (StrUtil.isNotBlank(outOfOrderStr)) {

View File

@ -50,7 +50,7 @@ public class GunsLogAutoConfiguration {
}
/**
*
*
*
* @author liuhanqing
* @date 2020/12/20 14:17

View File

@ -0,0 +1,29 @@
package cn.stylefeng.roses.kernel.message.api;
import cn.stylefeng.roses.kernel.db.api.pojo.page.PageResult;
import cn.stylefeng.roses.kernel.message.api.enums.MessageReadFlagEnum;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageParam;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageResponse;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageSendParam;
import java.util.List;
/**
* websocket
*
* @author liuhanqing
* @date 2021/1/26 18:14
*/
public interface WebsocketApi {
/**
* websocket
*
* @param userIdList userId
* @param messageSendParam
* @author liuhanqing
* @date 2021/1/26 18:17
*/
void sendWebSocketMessage(List<Long> userIdList, MessageSendParam messageSendParam);
}

View File

@ -0,0 +1,37 @@
package cn.stylefeng.roses.kernel.message.api.expander;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.stylefeng.roses.kernel.config.api.context.ConfigContext;
/**
* websocket
*
* @author liuhanqing
* @date 2021/1/25 20:05
*/
public class WebSocketConfigExpander {
/**
* websocketws-url
*
* @author liuhanqing
* @date 2021/1/25 20:34
*/
public static String getWebSocketWsUrl() {
String webSocketWsUr = ConfigContext.me().getConfigValueNullable("WEB_SOCKET_WS_URL", String.class);
if (webSocketWsUr == null) {
// 没配置就查询配置文件
String propertiesUrl = SpringUtil.getProperty("web-socket.ws-url");
if(StrUtil.isNotEmpty(propertiesUrl)){
return propertiesUrl;
}
// 没配置就返回一个空串
return StrUtil.EMPTY;
}
return webSocketWsUr;
}
}

View File

@ -1,11 +1,13 @@
package cn.stylefeng.roses.kernel.message.api.pojo;
import cn.stylefeng.roses.kernel.rule.pojo.request.BaseRequest;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.Date;
/**
*
@ -56,4 +58,9 @@ public class MessageSendParam extends BaseRequest {
@NotBlank(message = "业务类型不能为空", groups = {add.class, edit.class})
private String businessType;
/**
*
*/
private Date messageSendTime;
}

View File

@ -0,0 +1,24 @@
package cn.stylefeng.roses.kernel.message.api.pojo;
import lombok.Data;
/**
*
*
* @author liuhanqing
* @date 2021/1/25 9:25
*/
@Data
public class MessageWebSocketProperties {
/**
* websocket
*/
private Boolean open = true ;
/**
* websocket
*/
private String wsUrl = "";
}

View File

@ -15,6 +15,7 @@ import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -43,6 +44,7 @@ public class SysMessageController {
*/
@PostResource(name = "发送系统消息", path = "/sysMessage/sendMessage")
public ResponseData sendMessage(@RequestBody @Validated(MessageSendParam.add.class) MessageSendParam messageSendParam) {
messageSendParam.setMessageSendTime(new Date());
messageApi.sendMessage(messageSendParam);
return new SuccessResponseData();
}

View File

@ -1,12 +1,14 @@
package cn.stylefeng.roses.kernel.message.db;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
import cn.stylefeng.roses.kernel.db.api.pojo.page.PageResult;
import cn.stylefeng.roses.kernel.message.api.MessageApi;
import cn.stylefeng.roses.kernel.message.api.WebsocketApi;
import cn.stylefeng.roses.kernel.message.api.constants.MessageConstants;
import cn.stylefeng.roses.kernel.message.api.enums.MessageReadFlagEnum;
import cn.stylefeng.roses.kernel.message.api.exception.MessageException;
@ -39,6 +41,9 @@ import java.util.stream.Collectors;
public class MessageDbServiceImpl implements MessageApi {
@Resource
private WebsocketApi websocketApi;
@Resource
private UserServiceApi userServiceApi;
@ -57,7 +62,6 @@ public class MessageDbServiceImpl implements MessageApi {
if (MessageConstants.RECEIVE_ALL_USER_FLAG.equals(receiveUserIds)) {
// 查询所有用户
userIds = userServiceApi.queryAllUserIdList(new SysUserRequest());
} else {
String[] userIdArr = receiveUserIds.split(",");
userIds = Convert.toList(Long.class, userIdArr);
@ -72,7 +76,6 @@ public class MessageDbServiceImpl implements MessageApi {
// 初始化默认值
sysMessage.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
sysMessage.setSendUserId(loginUser.getUserId());
sysMessage.setMessageSendTime(new Date());
userIdSet.forEach(userId -> {
// 判断用户是否存在
if (userServiceApi.userExist(userId)) {
@ -80,6 +83,8 @@ public class MessageDbServiceImpl implements MessageApi {
sendMsgList.add(sysMessage);
}
});
websocketApi.sendWebSocketMessage(ListUtil.toList(userIdSet), messageSendParam);
sysMessageService.saveBatch(sendMsgList);
}

View File

@ -0,0 +1 @@
系统消息websocket的sdk用于将消息发送给在线用户并提供相应接口

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>kernel-s-message</artifactId>
<version>1.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>message-sdk-websocket</artifactId>
<packaging>jar</packaging>
<dependencies>
<!--消息模块的api-->
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>message-api</artifactId>
<version>1.0.0</version>
</dependency>
<!--websocket 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>auth-api</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,55 @@
package cn.stylefeng.roses.kernel.message.websocket;
import cn.hutool.core.bean.BeanUtil;
import cn.stylefeng.roses.kernel.auth.api.context.LoginContext;
import cn.stylefeng.roses.kernel.auth.api.pojo.login.LoginUser;
import cn.stylefeng.roses.kernel.message.api.WebsocketApi;
import cn.stylefeng.roses.kernel.message.api.enums.MessageReadFlagEnum;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageResponse;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageSendParam;
import cn.stylefeng.roses.kernel.message.websocket.manager.WebSocketManager;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
/**
* websocket
*
* @author liuhanqing
* @date 2021/1/2 22:00
*/
@Slf4j
@Service
public class WebSocketServiceImpl implements WebsocketApi {
public final static ObjectMapper MAPPER;
static {
MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
@Override
public void sendWebSocketMessage(List<Long> userIdList, MessageSendParam messageSendParam) {
// 获取当前登录人
LoginUser loginUser = LoginContext.me().getLoginUser();
try {
MessageResponse sysMessage = new MessageResponse();
BeanUtil.copyProperties(messageSendParam, sysMessage);
sysMessage.setReadFlag(MessageReadFlagEnum.UNREAD.getCode());
sysMessage.setSendUserId(loginUser.getUserId());
String msgInfo = MAPPER.writeValueAsString(sysMessage);
for (Long userId : userIdList) {
WebSocketManager.sendMessage(userId, msgInfo);
}
} catch (JsonProcessingException e) {
log.error("发送websocket异常", e);
}
}
}

View File

@ -0,0 +1,95 @@
package cn.stylefeng.roses.kernel.message.websocket.manager;
import org.springframework.util.CollectionUtils;
import javax.websocket.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author liuhq
*/
public class WebSocketManager {
private static final ConcurrentHashMap<Long, List<Session>> userIdSessionMap = new ConcurrentHashMap<>();
/**
* IDSession
*
* @param userId id
* @param session websocketSession
* @author liuhanqing
* @date 2021/1/24 22:08
*/
public static void add(Long userId, Session session) {
userIdSessionMap.computeIfAbsent(userId, v -> new ArrayList<>()).add(session);
}
/**
* IDSession
*
* @param userId id
* @return List<Session> websocketSession
* @author liuhanqing
* @date 2021/1/24 22:10
*/
public static List<Session> getSessionByUserId(Long userId) {
return userIdSessionMap.get(userId);
}
/**
* Session
*
* @param userId id
* @param session websocketSession
* @author liuhanqing
* @date 2021/1/24 22:11
*/
public static void removeSession(Long userId, Session session) {
if (session == null) {
return;
}
List<Session> webSessoin = userIdSessionMap.get(userId);
if (webSessoin == null || CollectionUtils.isEmpty(webSessoin)) {
return;
}
webSessoin.remove(session);
}
/**
*
*
* @author liuhanqing
* @date 2021/1/24 22:11
*/
public static Set<Long> getUserList() {
return userIdSessionMap.keySet();
}
/**
*
*
* @param userId id
* @param message
* @author liuhanqing
* @date 2021/1/24 22:11
*/
public static void sendMessage(Long userId, String message){
for(Session userSession: getSessionByUserId(userId)){
userSession.getAsyncRemote().sendText(message);
}
}
/**
*
*
* @param message
* @author liuhanqing
* @date 2021/1/24 22:11
*/
public static void sendMessageToAll(String message){
for (Long userId : WebSocketManager.getUserList()) {
sendMessage(userId, message);
}
}
}

View File

@ -0,0 +1,78 @@
package cn.stylefeng.roses.kernel.message.websocket.server;
import cn.stylefeng.roses.kernel.message.websocket.manager.WebSocketManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
* websocket
*
* @author liuhanqing
* @date 2021/1/24 22:26
*/
@Slf4j
@Component
@ServerEndpoint("/message/websocket/{userId}")
public class WebSocketEndpoint {
/**
*
*
* @param userId id
* @param session websocketSession
* @author liuhanqing
* @date 2021/1/24 22:27
*/
@OnOpen
public void onOpen(@PathParam(value = "userId") Long userId, Session session) {
// 添加到链接管理
WebSocketManager.add(userId, session);
// 返回消息
// session.getAsyncRemote().sendText("WebSocket连接成功");
}
/**
*
*
* @author liuhanqing
* @date 2021/1/24 22:29
*/
@OnClose
public void onClose(@PathParam(value = "userId") Long userId, Session session) {
// 从map中删除
WebSocketManager.removeSession(userId, session);
}
/**
*
*
* @param message
* @param session websocketSession
* @author liuhanqing
* @date 2021/1/24 22:29
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息:" + message);
}
/**
*
*
* @param session
* @param error
* @author liuhanqing
* @date 2021/1/24 22:29
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket发生错误");
error.printStackTrace();
}
}

View File

@ -32,6 +32,11 @@
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>cn.stylefeng.roses</groupId>
<artifactId>message-sdk-websocket</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>

View File

@ -1,6 +1,13 @@
package cn.stylefeng.roses.kernel.message.starter;
import cn.stylefeng.roses.kernel.message.api.expander.WebSocketConfigExpander;
import cn.stylefeng.roses.kernel.message.api.pojo.MessageWebSocketProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
*
@ -11,6 +18,36 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class GunsMessageAutoConfiguration {
public static final String WEB_SOCKET_PREFIX = "web-socket";
/**
* websocket
*
* @author liuhanqing
* @date 2021/1/25 9:29
*/
@Bean
@ConfigurationProperties(prefix = WEB_SOCKET_PREFIX)
public MessageWebSocketProperties messageWebSocketProperties() {
MessageWebSocketProperties properties = new MessageWebSocketProperties();
properties.setWsUrl(WebSocketConfigExpander.getWebSocketWsUrl());
return properties;
}
/**
* WebSocket
*
* @return serverEndpointExporter
* @author liuhanqing
* @date 2021/01/24 22:09
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
@ConditionalOnProperty(prefix = WEB_SOCKET_PREFIX, name = "open", havingValue = "true")
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

View File

@ -19,6 +19,7 @@
<module>message-api</module>
<module>message-business</module>
<module>message-sdk-db</module>
<module>message-sdk-websocket</module>
<module>message-spring-boot-starter</module>
</modules>

View File

@ -22,6 +22,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
@ -56,6 +57,7 @@ public class SysNoticeServiceImpl extends ServiceImpl<SysNoticeMapper, SysNotice
// 消息业务类型
message.setBusinessType(MessageBusinessTypeEnum.SYS_NOTICE.getCode());
message.setBusinessId(sysNotice.getNoticeId());
message.setMessageSendTime(new Date());
messageApi.sendMessage(message);
}