【新增】增加SSE消息推送模块,实现右上角站内信徽标数推送

【修复】修复个人中心-我的消息 关闭详情页是否已读未刷新的问题
pull/135/head
diant 2023-07-18 16:11:43 +08:00
parent be63db01f7
commit d77945a427
7 changed files with 90 additions and 32 deletions

View File

@ -28,13 +28,14 @@ public interface DevSseApi {
* SSE * SSE
* *
* @param clientId id, * @param clientId id,
* @param setHeartBeat ,falsetrue: false: * @param setHeartBeat ,falsetrue: false:
* @param consumer ,ConsumeracceptsetHeartBeattrue * @param defaultHeartbeat 使
* @param consumer ,ConsumeracceptsetHeartBeattrue,defaultHeartbeatfalse
* @return id,0 * @return id,0
* @author diantu * @author diantu
* @date 2023/7/5 * @date 2023/7/5
**/ **/
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer<CommonSseParam> consumer); public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<CommonSseParam> consumer);
/** /**
* *

View File

@ -48,9 +48,11 @@ public class DevSseEmitterController {
@ApiOperationSupport(order = 1) @ApiOperationSupport(order = 1)
@ApiOperation("创建sse连接") @ApiOperation("创建sse连接")
@GetMapping("/dev/sse/createConnect") @GetMapping("/dev/sse/createConnect")
public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat, public SseEmitter createConnect(String clientId,
@RequestParam(required = false)Boolean setHeartBeat,
@RequestParam(required = false)Boolean defaultHeartbeat,
@RequestParam(required = false)Consumer<CommonSseParam> consumer){ @RequestParam(required = false)Consumer<CommonSseParam> consumer){
return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer);
} }
/** /**

View File

@ -39,8 +39,8 @@ public class DevSseProvider implements DevSseApi {
* @date 2023/7/5 * @date 2023/7/5
**/ **/
@Override @Override
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer<CommonSseParam> consumer) { public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<CommonSseParam> consumer) {
return devSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer);
} }
/** /**

View File

@ -30,7 +30,7 @@ public interface DevSseEmitterService {
* @author diantu * @author diantu
* @date 2023/7/3 * @date 2023/7/3
**/ **/
public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer<CommonSseParam> consumer); public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat,Boolean defaultHeartbeat,Consumer<CommonSseParam> consumer);
/** /**
* *

View File

@ -47,7 +47,7 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService {
* @date 2023/7/3 * @date 2023/7/3
**/ **/
@Override @Override
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer<CommonSseParam> consumer) { public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<CommonSseParam> consumer) {
// 设置超时时间0表示不过期。默认30秒超过时间未完成会抛出异常AsyncRequestTimeoutException // 设置超时时间0表示不过期。默认30秒超过时间未完成会抛出异常AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L); SseEmitter sseEmitter = new SseEmitter(0L);
String loginId = StpUtil.getLoginIdAsString(); String loginId = StpUtil.getLoginIdAsString();
@ -63,25 +63,32 @@ public class DevSseEmitterServiceImpl implements DevSseEmitterService {
final ScheduledFuture<?> future; final ScheduledFuture<?> future;
// 是否自定义心跳任务 // 是否自定义心跳任务
if (setHeartBeat!=null&&setHeartBeat) { if (setHeartBeat!=null&&setHeartBeat) {
CommonSseParam commonSseParam = new CommonSseParam(); //是否使用默认心跳任务
commonSseParam.setClientId(clientId); if(defaultHeartbeat!=null&&defaultHeartbeat){
commonSseParam.setLoginId(loginId); //默认心跳任务
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), future = heartbeatExecutors.scheduleAtFixedRate(() ->
2, 10, TimeUnit.SECONDS); DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
2, 10, TimeUnit.SECONDS);
}else{
//自定义心跳任务
CommonSseParam commonSseParam = new CommonSseParam();
commonSseParam.setClientId(clientId);
commonSseParam.setLoginId(loginId);
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam),
2, 10, TimeUnit.SECONDS);
}
// 增加连接
DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
} else { } else {
//默认心跳任务 // 增加连接
future = heartbeatExecutors.scheduleAtFixedRate(() -> DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null);
DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
2, 10, TimeUnit.SECONDS);
} }
// 长链接完成后回调(即关闭连接时调用) // 长链接完成后回调(即关闭连接时调用)
sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId,future)); sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId));
// 连接超时回调 // 连接超时回调
sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId,future)); sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId));
// 推送消息异常回调 // 推送消息异常回调
sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId,future)); sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId));
// 增加连接
DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
// 初次建立连接,推送客户端id // 初次建立连接,推送客户端id
CommonResult<String> message = new CommonResult<>(0,"",clientId); CommonResult<String> message = new CommonResult<>(0,"",clientId);
DevSseCacheUtil.sendMessageToClientByClientId(clientId,message); DevSseCacheUtil.sendMessageToClientByClientId(clientId,message);

View File

@ -55,6 +55,53 @@ public class DevSseCacheUtil {
return (SseEmitter) map.get(DevSseEmitterParameterEnum.EMITTER.getValue()); return (SseEmitter) map.get(DevSseEmitterParameterEnum.EMITTER.getValue());
} }
/**
* id
*
* @author diantu
* @date 2023/7/18
**/
public static ScheduledFuture<?> getSseFutureByClientId(String clientId) {
Map<String,Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (ScheduledFuture<?>) map.get(DevSseEmitterParameterEnum.FUTURE.getValue());
}
/**
* idid
*
* @author diantu
* @date 2023/7/18
**/
public static ScheduledFuture<?> getLoginIdByClientId(String clientId) {
Map<String,Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (ScheduledFuture<?>) map.get(DevSseEmitterParameterEnum.LOGINID.getValue());
}
/**
* idid
*
* @author diantu
* @date 2023/7/18
**/
public static String getClientIdByLoginId(String loginId){
if(existSseCache()){
for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
Map<String,Object> map = sseCache.get(entry.getKey());
String lId = (String) map.get(DevSseEmitterParameterEnum.LOGINID.getValue());
if(loginId.equals(lId)){
return entry.getKey();
}
}
}
return null;
}
/** /**
* *
* *
@ -105,7 +152,7 @@ public class DevSseCacheUtil {
public static void removeConnection(String clientId) { public static void removeConnection(String clientId) {
SseEmitter emitter = getSseEmitterByClientId(clientId); SseEmitter emitter = getSseEmitterByClientId(clientId);
if (emitter != null) { if (emitter != null) {
cancelScheduledFuture((ScheduledFuture<?>) sseCache.get(clientId).get(DevSseEmitterParameterEnum.FUTURE.getValue())); cancelScheduledFuture(clientId);
} }
sseCache.remove(clientId); sseCache.remove(clientId);
log.info("移除连接:{}", clientId); log.info("移除连接:{}", clientId);
@ -117,7 +164,8 @@ public class DevSseCacheUtil {
* @author diantu * @author diantu
* @date 2023/7/3 * @date 2023/7/3
*/ */
public static void cancelScheduledFuture(ScheduledFuture<?> future){ public static void cancelScheduledFuture(String clientId){
ScheduledFuture<?> future = getSseFutureByClientId(clientId);
if (future != null) { if (future != null) {
future.cancel(true); future.cancel(true);
} }
@ -130,11 +178,11 @@ public class DevSseCacheUtil {
* @author diantu * @author diantu
* @date 2023/7/3 * @date 2023/7/3
**/ **/
public static Runnable completionCallBack(String clientId, ScheduledFuture<?> future) { public static Runnable completionCallBack(String clientId) {
return () -> { return () -> {
log.info("结束连接:{}", clientId); log.info("结束连接:{}", clientId);
removeConnection(clientId); removeConnection(clientId);
cancelScheduledFuture(future); cancelScheduledFuture(clientId);
}; };
} }
@ -144,11 +192,11 @@ public class DevSseCacheUtil {
* @author diantu * @author diantu
* @date 2023/7/3 * @date 2023/7/3
**/ **/
public static Runnable timeoutCallBack(String clientId, ScheduledFuture<?> future){ public static Runnable timeoutCallBack(String clientId){
return ()->{ return ()->{
log.info("连接超时:{}", clientId); log.info("连接超时:{}", clientId);
removeConnection(clientId); removeConnection(clientId);
cancelScheduledFuture(future); cancelScheduledFuture(clientId);
}; };
} }
@ -158,11 +206,11 @@ public class DevSseCacheUtil {
* @author diantu * @author diantu
* @date 2023/7/3 * @date 2023/7/3
**/ **/
public static Consumer<Throwable> errorCallBack(String clientId, ScheduledFuture<?> future) { public static Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> { return throwable -> {
log.info("推送消息异常:{}", clientId); log.info("推送消息异常:{}", clientId);
removeConnection(clientId); removeConnection(clientId);
cancelScheduledFuture(future); cancelScheduledFuture(clientId);
}; };
} }

View File

@ -125,6 +125,6 @@ public class SysIndexServiceImpl implements SysIndexService {
//发送消息 //发送消息
devSseApi.sendMessageToOneClient(m.getClientId(), String.valueOf(unreadMessageNum)); devSseApi.sendMessageToOneClient(m.getClientId(), String.valueOf(unreadMessageNum));
}; };
return devSseApi.createSseConnect(clientId,true,consumer); return devSseApi.createSseConnect(clientId,true,false,consumer);
} }
} }