diff --git a/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java b/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java index f3b2a08a..9b3ee312 100644 --- a/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java +++ b/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java @@ -28,13 +28,14 @@ public interface DevSseApi { * 创建SSE连接 * * @param clientId 客户端id,不传则自动生成 - * @param setHeartBeat 是否设置自定义心跳定时任务,默认为false(true:设置 false:不设置) - * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true才有意义) + * @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置) + * @param defaultHeartbeat 是否使用默认心跳任务 + * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义) * @return 初次建立连接会推送客户端id,状态码为0 * @author diantu * @date 2023/7/5 **/ - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer); + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer); /** * 关闭连接 diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java index ff0b2755..13440d07 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java @@ -48,9 +48,11 @@ public class DevSseEmitterController { @ApiOperationSupport(order = 1) @ApiOperation("创建sse连接") @GetMapping("/dev/sse/createConnect") - public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat, + public SseEmitter createConnect(String clientId, + @RequestParam(required = false)Boolean setHeartBeat, + @RequestParam(required = false)Boolean defaultHeartbeat, @RequestParam(required = false)Consumer consumer){ - return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer); } /** diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java index 65b18e0c..4051eb91 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java @@ -39,8 +39,8 @@ public class DevSseProvider implements DevSseApi { * @date 2023/7/5 **/ @Override - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { - return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer) { + return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer); } /** diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java index 2aeceb90..4c57c81b 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java @@ -30,7 +30,7 @@ public interface DevSseEmitterService { * @author diantu * @date 2023/7/3 **/ - public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer consumer); + public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat,Boolean defaultHeartbeat,Consumer consumer); /** * 关闭连接 diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java index c481dc66..bd213151 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java @@ -47,7 +47,7 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService { * @date 2023/7/3 **/ @Override - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); String loginId = StpUtil.getLoginIdAsString(); @@ -63,25 +63,32 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService { final ScheduledFuture future; // 是否自定义心跳任务 if (setHeartBeat!=null&&setHeartBeat) { - CommonSseParam commonSseParam = new CommonSseParam(); - commonSseParam.setClientId(clientId); - commonSseParam.setLoginId(loginId); - future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), - 2, 10, TimeUnit.SECONDS); + //是否使用默认心跳任务 + if(defaultHeartbeat!=null&&defaultHeartbeat){ + //默认心跳任务 + future = heartbeatExecutors.scheduleAtFixedRate(() -> + DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), + 2, 10, TimeUnit.SECONDS); + }else{ + //自定义心跳任务 + CommonSseParam commonSseParam = new CommonSseParam(); + commonSseParam.setClientId(clientId); + commonSseParam.setLoginId(loginId); + future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), + 2, 10, TimeUnit.SECONDS); + } + // 增加连接 + DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); } else { - //默认心跳任务 - future = heartbeatExecutors.scheduleAtFixedRate(() -> - DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), - 2, 10, TimeUnit.SECONDS); + // 增加连接 + DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null); } // 长链接完成后回调(即关闭连接时调用) - sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId,future)); + sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId)); // 连接超时回调 - sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId,future)); + sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId)); // 推送消息异常回调 - sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId,future)); - // 增加连接 - DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); + sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId)); // 初次建立连接,推送客户端id CommonResult message = new CommonResult<>(0,"",clientId); DevSseCacheUtil.sendMessageToClientByClientId(clientId,message); diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java index 6a1673d4..13bd681d 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java @@ -55,6 +55,53 @@ public class DevSseCacheUtil { return (SseEmitter) map.get(DevSseEmitterParameterEnum.EMITTER.getValue()); } + /** + * 根据客户端id获取心跳 + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getSseFutureByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DevSseEmitterParameterEnum.FUTURE.getValue()); + } + + /** + * 根据客户端id获取用户id + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getLoginIdByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DevSseEmitterParameterEnum.LOGINID.getValue()); + } + + /** + * 根据用户id获取客户端id + * + * @author diantu + * @date 2023/7/18 + **/ + public static String getClientIdByLoginId(String loginId){ + if(existSseCache()){ + for (Map.Entry> entry : sseCache.entrySet()) { + Map map = sseCache.get(entry.getKey()); + String lId = (String) map.get(DevSseEmitterParameterEnum.LOGINID.getValue()); + if(loginId.equals(lId)){ + return entry.getKey(); + } + } + } + return null; + } + /** * 判断容器是否存在连接 * @@ -105,7 +152,7 @@ public class DevSseCacheUtil { public static void removeConnection(String clientId) { SseEmitter emitter = getSseEmitterByClientId(clientId); if (emitter != null) { - cancelScheduledFuture((ScheduledFuture) sseCache.get(clientId).get(DevSseEmitterParameterEnum.FUTURE.getValue())); + cancelScheduledFuture(clientId); } sseCache.remove(clientId); log.info("移除连接:{}", clientId); @@ -117,7 +164,8 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 */ - public static void cancelScheduledFuture(ScheduledFuture future){ + public static void cancelScheduledFuture(String clientId){ + ScheduledFuture future = getSseFutureByClientId(clientId); if (future != null) { future.cancel(true); } @@ -130,11 +178,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Runnable completionCallBack(String clientId, ScheduledFuture future) { + public static Runnable completionCallBack(String clientId) { return () -> { log.info("结束连接:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -144,11 +192,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Runnable timeoutCallBack(String clientId, ScheduledFuture future){ + public static Runnable timeoutCallBack(String clientId){ return ()->{ log.info("连接超时:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -158,11 +206,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Consumer errorCallBack(String clientId, ScheduledFuture future) { + public static Consumer errorCallBack(String clientId) { return throwable -> { log.info("推送消息异常:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } diff --git a/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java b/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java index 1a8c6027..6577406e 100644 --- a/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java +++ b/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java @@ -125,6 +125,6 @@ public class SysIndexServiceImpl implements SysIndexService { //发送消息 devSseApi.sendMessageToOneClient(m.getClientId(), String.valueOf(unreadMessageNum)); }; - return devSseApi.createSseConnect(clientId,true,consumer); + return devSseApi.createSseConnect(clientId,true,false,consumer); } }