From d77945a4279b332d93d7b1932f3755964ff6682d Mon Sep 17 00:00:00 2001 From: diant Date: Tue, 18 Jul 2023 16:11:43 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E6=96=B0=E5=A2=9E=E3=80=91=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0SSE=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81=E6=A8=A1?= =?UTF-8?q?=E5=9D=97,=E5=AE=9E=E7=8E=B0=E5=8F=B3=E4=B8=8A=E8=A7=92?= =?UTF-8?q?=E7=AB=99=E5=86=85=E4=BF=A1=E5=BE=BD=E6=A0=87=E6=95=B0=E6=8E=A8?= =?UTF-8?q?=E9=80=81=20=E3=80=90=E4=BF=AE=E5=A4=8D=E3=80=91=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E4=B8=AA=E4=BA=BA=E4=B8=AD=E5=BF=83-=E6=88=91?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=20=E5=85=B3=E9=97=AD=E8=AF=A6?= =?UTF-8?q?=E6=83=85=E9=A1=B5=E6=98=AF=E5=90=A6=E5=B7=B2=E8=AF=BB=E6=9C=AA?= =?UTF-8?q?=E5=88=B7=E6=96=B0=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/vip/xiaonuo/dev/api/DevSseApi.java | 7 +- .../controller/DevSseEmitterController.java | 6 +- .../modular/sse/provider/DevSseProvider.java | 4 +- .../sse/service/DevSseEmitterService.java | 2 +- .../impl/DevSseEmitterServiceImpl.java | 37 ++++++----- .../dev/modular/sse/util/DevSseCacheUtil.java | 64 ++++++++++++++++--- .../service/impl/SysIndexServiceImpl.java | 2 +- 7 files changed, 90 insertions(+), 32 deletions(-) diff --git a/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java b/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java index f3b2a08a..9b3ee312 100644 --- a/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java +++ b/snowy-plugin-api/snowy-plugin-dev-api/src/main/java/vip/xiaonuo/dev/api/DevSseApi.java @@ -28,13 +28,14 @@ public interface DevSseApi { * 创建SSE连接 * * @param clientId 客户端id,不传则自动生成 - * @param setHeartBeat 是否设置自定义心跳定时任务,默认为false(true:设置 false:不设置) - * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true才有意义) + * @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置) + * @param defaultHeartbeat 是否使用默认心跳任务 + * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义) * @return 初次建立连接会推送客户端id,状态码为0 * @author diantu * @date 2023/7/5 **/ - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer); + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer); /** * 关闭连接 diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java index ff0b2755..13440d07 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/controller/DevSseEmitterController.java @@ -48,9 +48,11 @@ public class DevSseEmitterController { @ApiOperationSupport(order = 1) @ApiOperation("创建sse连接") @GetMapping("/dev/sse/createConnect") - public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat, + public SseEmitter createConnect(String clientId, + @RequestParam(required = false)Boolean setHeartBeat, + @RequestParam(required = false)Boolean defaultHeartbeat, @RequestParam(required = false)Consumer consumer){ - return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer); } /** diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java index 65b18e0c..4051eb91 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/provider/DevSseProvider.java @@ -39,8 +39,8 @@ public class DevSseProvider implements DevSseApi { * @date 2023/7/5 **/ @Override - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { - return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer) { + return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer); } /** diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java index 2aeceb90..4c57c81b 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/DevSseEmitterService.java @@ -30,7 +30,7 @@ public interface DevSseEmitterService { * @author diantu * @date 2023/7/3 **/ - public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer consumer); + public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat,Boolean defaultHeartbeat,Consumer consumer); /** * 关闭连接 diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java index c481dc66..bd213151 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/service/impl/DevSseEmitterServiceImpl.java @@ -47,7 +47,7 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService { * @date 2023/7/3 **/ @Override - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); String loginId = StpUtil.getLoginIdAsString(); @@ -63,25 +63,32 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService { final ScheduledFuture future; // 是否自定义心跳任务 if (setHeartBeat!=null&&setHeartBeat) { - CommonSseParam commonSseParam = new CommonSseParam(); - commonSseParam.setClientId(clientId); - commonSseParam.setLoginId(loginId); - future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), - 2, 10, TimeUnit.SECONDS); + //是否使用默认心跳任务 + if(defaultHeartbeat!=null&&defaultHeartbeat){ + //默认心跳任务 + future = heartbeatExecutors.scheduleAtFixedRate(() -> + DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), + 2, 10, TimeUnit.SECONDS); + }else{ + //自定义心跳任务 + CommonSseParam commonSseParam = new CommonSseParam(); + commonSseParam.setClientId(clientId); + commonSseParam.setLoginId(loginId); + future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), + 2, 10, TimeUnit.SECONDS); + } + // 增加连接 + DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); } else { - //默认心跳任务 - future = heartbeatExecutors.scheduleAtFixedRate(() -> - DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), - 2, 10, TimeUnit.SECONDS); + // 增加连接 + DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null); } // 长链接完成后回调(即关闭连接时调用) - sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId,future)); + sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId)); // 连接超时回调 - sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId,future)); + sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId)); // 推送消息异常回调 - sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId,future)); - // 增加连接 - DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); + sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId)); // 初次建立连接,推送客户端id CommonResult message = new CommonResult<>(0,"",clientId); DevSseCacheUtil.sendMessageToClientByClientId(clientId,message); diff --git a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java index 6a1673d4..13bd681d 100644 --- a/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java +++ b/snowy-plugin/snowy-plugin-dev/src/main/java/vip/xiaonuo/dev/modular/sse/util/DevSseCacheUtil.java @@ -55,6 +55,53 @@ public class DevSseCacheUtil { return (SseEmitter) map.get(DevSseEmitterParameterEnum.EMITTER.getValue()); } + /** + * 根据客户端id获取心跳 + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getSseFutureByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DevSseEmitterParameterEnum.FUTURE.getValue()); + } + + /** + * 根据客户端id获取用户id + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getLoginIdByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DevSseEmitterParameterEnum.LOGINID.getValue()); + } + + /** + * 根据用户id获取客户端id + * + * @author diantu + * @date 2023/7/18 + **/ + public static String getClientIdByLoginId(String loginId){ + if(existSseCache()){ + for (Map.Entry> entry : sseCache.entrySet()) { + Map map = sseCache.get(entry.getKey()); + String lId = (String) map.get(DevSseEmitterParameterEnum.LOGINID.getValue()); + if(loginId.equals(lId)){ + return entry.getKey(); + } + } + } + return null; + } + /** * 判断容器是否存在连接 * @@ -105,7 +152,7 @@ public class DevSseCacheUtil { public static void removeConnection(String clientId) { SseEmitter emitter = getSseEmitterByClientId(clientId); if (emitter != null) { - cancelScheduledFuture((ScheduledFuture) sseCache.get(clientId).get(DevSseEmitterParameterEnum.FUTURE.getValue())); + cancelScheduledFuture(clientId); } sseCache.remove(clientId); log.info("移除连接:{}", clientId); @@ -117,7 +164,8 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 */ - public static void cancelScheduledFuture(ScheduledFuture future){ + public static void cancelScheduledFuture(String clientId){ + ScheduledFuture future = getSseFutureByClientId(clientId); if (future != null) { future.cancel(true); } @@ -130,11 +178,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Runnable completionCallBack(String clientId, ScheduledFuture future) { + public static Runnable completionCallBack(String clientId) { return () -> { log.info("结束连接:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -144,11 +192,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Runnable timeoutCallBack(String clientId, ScheduledFuture future){ + public static Runnable timeoutCallBack(String clientId){ return ()->{ log.info("连接超时:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -158,11 +206,11 @@ public class DevSseCacheUtil { * @author diantu * @date 2023/7/3 **/ - public static Consumer errorCallBack(String clientId, ScheduledFuture future) { + public static Consumer errorCallBack(String clientId) { return throwable -> { log.info("推送消息异常:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } diff --git a/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java b/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java index 1a8c6027..6577406e 100644 --- a/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java +++ b/snowy-plugin/snowy-plugin-sys/src/main/java/vip/xiaonuo/sys/modular/index/service/impl/SysIndexServiceImpl.java @@ -125,6 +125,6 @@ public class SysIndexServiceImpl implements SysIndexService { //发送消息 devSseApi.sendMessageToOneClient(m.getClientId(), String.valueOf(unreadMessageNum)); }; - return devSseApi.createSseConnect(clientId,true,consumer); + return devSseApi.createSseConnect(clientId,true,false,consumer); } }