websocket优化代码

pull/4077/head
zhangdaiscott 2022-09-22 15:50:56 +08:00
parent f1d62a418f
commit 8dc91c33e5
2 changed files with 96 additions and 83 deletions

View File

@ -9,11 +9,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* (redis) * (redis)
* redisredisws
* @author: jeecg-boot * @author: jeecg-boot
*/ */
@Slf4j @Slf4j
@Component @Component(WebSocket.REDIS_TOPIC_NAME)
public class SocketHandler implements JeecgRedisListener { public class SocketHandler implements JeecgRedisListener {
@Autowired @Autowired
@ -21,15 +22,17 @@ public class SocketHandler implements JeecgRedisListener {
@Override @Override
public void onMessage(BaseMap map) { public void onMessage(BaseMap map) {
log.info("【SocketHandler消息】Redis Listerer:" + map.toString()); log.info("【Redis发布订阅模式】redis Listener: {},参数:{}",WebSocket.REDIS_TOPIC_NAME, map.toString());
String userId = map.get("userId"); String userId = map.get("userId");
String message = map.get("message"); String message = map.get("message");
if (ObjectUtil.isNotEmpty(userId)) { if (ObjectUtil.isNotEmpty(userId)) {
//pc端消息推送具体人
webSocket.pushMessage(userId, message); webSocket.pushMessage(userId, message);
//app端消息推送 //app端消息推送具体人
webSocket.pushMessage(userId+CommonSendStatus.APP_SESSION_SUFFIX, message); webSocket.pushMessage(userId+CommonSendStatus.APP_SESSION_SUFFIX, message);
} else { } else {
//推送全部
webSocket.pushMessage(message); webSocket.pushMessage(message);
} }

View File

@ -1,25 +1,17 @@
package org.jeecg.modules.message.websocket; package org.jeecg.modules.message.websocket;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.websocket.OnClose; import javax.websocket.*;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSONObject;
import org.jeecg.common.base.BaseMap; import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst; import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient; import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -31,109 +23,126 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@ServerEndpoint("/websocket/{userId}") @ServerEndpoint("/websocket/{userId}")
public class WebSocket { public class WebSocket {
private Session session; /**线程安全Map*/
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
/** /**
* ID * Redis
*/ */
private String userId; public static final String REDIS_TOPIC_NAME = "socketHandler";
private static final String REDIS_TOPIC_NAME = "socketHandler";
@Resource @Resource
private JeecgRedisClient jeecgRedisClient; private JeecgRedisClient jeecgRedisClient;
/**
* webSocketclass
*/
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
/**
* 线Map
*/
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
//==========【websocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) { public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try { try {
//TODO 通过header中获取token进行check
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session); sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接总数为:" + webSockets.size()); log.info("【系统 WebSocket】有新的连接总数为:" + sessionPool.size());
} catch (Exception e) { } catch (Exception e) {
} }
} }
@OnClose @OnClose
public void onClose() { public void onClose(@PathParam("userId") String userId) {
try { try {
webSockets.remove(this); sessionPool.remove(userId);
sessionPool.remove(this.userId); log.info("【系统 WebSocket】连接断开总数为:" + sessionPool.size());
log.info("【websocket消息】连接断开总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
*
*
* @param userId
* @param message
*/
public void pushMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
//update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
synchronized (session){
log.info("【websocket消息】 单点消息:" + message);
session.getBasicRemote().sendText(message);
}
//update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
*/
public void pushMessage(String message) {
try {
webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
/**
* ws
*
* @param userId
* @param message
*/
public void pushMessage(String userId, String message) {
for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
//userId key值= {用户id + "_"+ 登录token的md5串}
//TODO vue2未改key新规则暂时不影响逻辑
if (item.getKey().contains(userId)) {
Session session = item.getValue();
try {
//update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
synchronized (session){
log.info("【系统 WebSocket】推送单人消息:" + message);
session.getBasicRemote().sendText(message);
}
//update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
}
}
/**
* ws
*/
public void pushMessage(String message) {
try {
for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
try {
item.getValue().getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
log.info("【系统 WebSocket】群发消息:" + message);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* ws
*/
@OnMessage @OnMessage
public void onMessage(String message) { public void onMessage(String message, @PathParam(value = "userId") String userId) {
//todo 现在有个定时任务刷,应该去掉 if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){
log.debug("【websocket消息】收到客户端消息:" + message); log.info("【系统 WebSocket】收到客户端消息:" + message);
}else{
log.debug("【系统 WebSocket】收到客户端消息:" + message);
}
//------------------------------------------------------------------------------
JSONObject obj = new JSONObject(); JSONObject obj = new JSONObject();
//业务类型 //业务类型
obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK); obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
//消息内容 //消息内容
obj.put(WebsocketConst.MSG_TXT, "心跳响应"); obj.put(WebsocketConst.MSG_TXT, "心跳响应");
//update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制后端代码小bug #3473 this.pushMessage(userId, obj.toJSONString());
for (WebSocket webSocket : webSockets) { //------------------------------------------------------------------------------
webSocket.pushMessage(obj.toJSONString());
}
//update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制后端代码小bug #3473
} }
/**
*
*
* @param session
* @param t
*/
@OnError
public void onError(Session session, Throwable t) {
log.warn("【系统 WebSocket】消息出现错误");
//t.printStackTrace();
}
//==========【系统 WebSocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
//==========【采用redis发布订阅模式——推送消息】========================================================================================
/** /**
* redis * redis
* *
* @param message * @param message
*/ */
public void sendMessage(String message) { public void sendMessage(String message) {
log.info("【websocket消息】广播消息:" + message); //log.info("【系统 WebSocket】广播消息:" + message);
BaseMap baseMap = new BaseMap(); BaseMap baseMap = new BaseMap();
baseMap.put("userId", ""); baseMap.put("userId", "");
baseMap.put("message", message); baseMap.put("message", message);
@ -141,7 +150,7 @@ public class WebSocket {
} }
/** /**
* * redis
* *
* @param userId * @param userId
* @param message * @param message
@ -154,7 +163,7 @@ public class WebSocket {
} }
/** /**
* () * () redis
* *
* @param userIds * @param userIds
* @param message * @param message
@ -164,5 +173,6 @@ public class WebSocket {
sendMessage(userId, message); sendMessage(userId, message);
} }
} }
//=======【采用redis发布订阅模式——推送消息】==========================================================================================
} }