From 73059b8a5314f156d7e22b1497cd3f74a81202ad Mon Sep 17 00:00:00 2001 From: JEECG <445654970@qq.com> Date: Sat, 13 Sep 2025 16:15:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96AIRG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/controller/AiragChatController.java | 14 ++ .../airag/app/service/IAiragChatService.java | 9 + .../service/impl/AiragChatServiceImpl.java | 160 ++++++++++++++++-- .../llm/controller/AiragModelController.java | 15 +- .../modules/airag/llm/entity/AiragModel.java | 7 + .../airag/llm/handler/AIChatHandler.java | 27 ++- .../airag/llm/handler/JeecgToolsProvider.java | 42 +++++ 7 files changed, 251 insertions(+), 23 deletions(-) create mode 100644 jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/handler/JeecgToolsProvider.java diff --git a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/controller/AiragChatController.java b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/controller/AiragChatController.java index 62fec9a3f..3d78320f3 100644 --- a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/controller/AiragChatController.java +++ b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/controller/AiragChatController.java @@ -156,6 +156,20 @@ public class AiragChatController { return chatService.clearMessage(conversationId); } + /** + * 继续接收消息 + * + * @param requestId + * @return + * @author chenrui + * @date 2025/8/11 17:49 + */ + @IgnoreAuth + @GetMapping(value = "/receive/{requestId}") + public SseEmitter receiveByRequestId(@PathVariable(name = "requestId", required = true) String requestId) { + return chatService.receiveByRequestId(requestId); + } + /** * 根据请求ID停止某个请求的处理 diff --git a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/IAiragChatService.java b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/IAiragChatService.java index fc8e197af..985040893 100644 --- a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/IAiragChatService.java +++ b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/IAiragChatService.java @@ -102,4 +102,13 @@ public interface IAiragChatService { * @date 2025/4/21 14:17 */ Result initChat(String appId); + + /** + * 继续接收消息 + * @param requestId + * @return + * @author chenrui + * @date 2025/8/11 17:39 + */ + SseEmitter receiveByRequestId(String requestId); } diff --git a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/impl/AiragChatServiceImpl.java b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/impl/AiragChatServiceImpl.java index 552fdbe37..8874ab00a 100644 --- a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/impl/AiragChatServiceImpl.java +++ b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/app/service/impl/AiragChatServiceImpl.java @@ -8,13 +8,13 @@ import dev.langchain4j.service.TokenStream; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.api.vo.Result; import org.jeecg.common.exception.JeecgBootBizTipException; +import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.system.api.ISysBaseAPI; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.util.*; import org.jeecg.modules.airag.app.consts.AiAppConsts; import org.jeecg.modules.airag.app.entity.AiragApp; import org.jeecg.modules.airag.app.mapper.AiragAppMapper; -import org.jeecg.modules.airag.app.service.IAiragAppService; import org.jeecg.modules.airag.app.service.IAiragChatService; import org.jeecg.modules.airag.app.vo.AppDebugParams; import org.jeecg.modules.airag.app.vo.ChatConversation; @@ -31,6 +31,8 @@ import org.jeecg.modules.airag.flow.consts.FlowConsts; import org.jeecg.modules.airag.flow.service.IAiragFlowService; import org.jeecg.modules.airag.flow.vo.api.FlowRunParams; import org.jeecg.modules.airag.llm.entity.AiragModel; +import org.jeecg.modules.airag.llm.handler.AIChatHandler; +import org.jeecg.modules.airag.llm.handler.JeecgToolsProvider; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundValueOperations; @@ -41,8 +43,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -74,6 +75,14 @@ public class AiragChatServiceImpl implements IAiragChatService { @Autowired private RedisUtil redisUtil; + @Autowired + JeecgToolsProvider jeecgToolsProvider; + + /** + * 重新接收消息 + */ + private static final ExecutorService SSE_THREAD_POOL = Executors.newFixedThreadPool(10); // 最大10个线程 + @Override public SseEmitter send(ChatSendParams chatSendParams) { AssertUtils.assertNotEmpty("参数异常", chatSendParams); @@ -148,7 +157,9 @@ public class AiragChatServiceImpl implements IAiragChatService { // 发送完成事件 emitter.send(SseEmitter.event().data(eventData)); } catch (Exception e) { - log.error("终止会话时发生错误", e); + if(!e.getMessage().contains("ResponseBodyEmitter has already completed")){ + log.error("终止会话时发生错误", e); + } try { // 防止异常冒泡 emitter.completeWithError(e); @@ -250,6 +261,96 @@ public class AiragChatServiceImpl implements IAiragChatService { return Result.ok(app); } + @Override + public SseEmitter receiveByRequestId(String requestId) { + AssertUtils.assertNotEmpty("请选择会话",requestId); + if(AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId) == null){ + return null; + } + List datas = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, requestId); + if(null == datas){ + return null; + } + SseEmitter emitter = createSSE(requestId); + // 120秒 + final long timeoutMillis = 120_000L; + // 使用线程池提交任务 + SSE_THREAD_POOL.submit(() -> { + int lastIndex = 0; + long lastActiveTime = System.currentTimeMillis(); + try { + while (true) { + if(lastIndex < datas.size()) { + try { + EventData eventData = datas.get(lastIndex++); + String eventStr = JSONObject.toJSONString(eventData); + log.debug("[AI应用]继续接收-接收LLM返回消息:{}", eventStr); + emitter.send(SseEmitter.event().data(eventStr)); + // 有新消息,重置计时 + lastActiveTime = System.currentTimeMillis(); + } catch (IOException e) { + log.error("[AI应用]继续接收-发送消息失败"); + } + } else { + // 没有新消息了 + if (AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId) == null) { + // 主线程sse已经被移除,退出线程. + log.info("[AI应用]继续接收-SSE消息推送完成: {}", requestId); + break; + } else if (System.currentTimeMillis() - lastActiveTime > timeoutMillis) { + // 主线程未结束,等待超时, + log.warn("[AI应用]继续接收-等待消息更新超时,释放线程: {}", requestId); + break; + } else { + // 主线程未结束, 未超时, 休眠一会再查 + log.warn("[AI应用]继续接收-等待消息更新: {}", requestId); + Thread.sleep(500); + } + } + } + } catch (Exception e) { + log.error("SSE消息推送异常", e); + } finally { + try { + // 发送完成事件 + emitter.send(SseEmitter.event().data(new EventData(requestId, null, EventData.EVENT_MESSAGE_END))); + } catch (Exception e) { + log.error("终止会话时发生错误", e); + try { + // 防止异常冒泡 + emitter.completeWithError(e); + } catch (Exception ignore) {} + } finally { + // 关闭emitter + try { + emitter.complete(); + } catch (Exception ignore) {} + } + } + }); + return emitter; + } + + /** + * 创建SSE + * @param requestId + * @return + * @author chenrui + * @date 2025/8/12 15:30 + */ + private static SseEmitter createSSE(String requestId) { + SseEmitter emitter = new SseEmitter(-0L); + emitter.onError(throwable -> { + log.warn("SEE向客户端发送消息失败: {}", throwable.getMessage()); + AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE, requestId); + AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE_SEND_TIME, requestId); + try { + emitter.complete(); + } catch (Exception ignore) {} + }); + return emitter; + } + @Override public Result deleteConversation(String conversationId) { AssertUtils.assertNotEmpty("请选择要删除的会话", conversationId); @@ -522,22 +623,14 @@ public class AiragChatServiceImpl implements IAiragChatService { AiragApp aiApp = chatConversation.getApp(); // 每次会话都生成一个新的,用来缓存emitter String requestId = UUIDGenerator.generate(); - SseEmitter emitter = new SseEmitter(-0L); - emitter.onError(throwable -> { - log.warn("SEE向客户端发送消息失败: {}", throwable.getMessage()); - AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE, requestId); - try { - emitter.complete(); - } catch (Exception ignore) {} - }); - EventData eventRequestId = new EventData(requestId, null, EventData.EVENT_INIT_REQUEST_ID, chatConversation.getId(), topicId); - eventRequestId.setData(EventMessageData.builder().message("").build()); - sendMessage2Client(emitter, eventRequestId); + SseEmitter emitter = createSSE(requestId); // 缓存emitter AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE, requestId, emitter); // 缓存开始发送时间 log.info("[AI-CHAT]开始发送消息,requestId:{}", requestId); AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_SEND_TIME, requestId, System.currentTimeMillis()); + // 初始化历史消息缓存 + AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, requestId, new CopyOnWriteArrayList<>()); try { // 组装用户消息 UserMessage userMessage = aiChatHandler.buildUserMessage(sendParams.getContent(), sendParams.getImages()); @@ -561,6 +654,10 @@ public class AiragChatServiceImpl implements IAiragChatService { // 发消息 sendWithDefault(requestId, chatConversation, topicId, null, messages, null); } + // 发送就绪消息 + EventData eventRequestId = new EventData(requestId, null, EventData.EVENT_INIT_REQUEST_ID, chatConversation.getId(), topicId); + eventRequestId.setData(EventMessageData.builder().message("").build()); + sendMessage2Client(emitter, eventRequestId); } catch (Throwable e) { log.error(e.getMessage(), e); EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId); @@ -725,6 +822,10 @@ public class AiragChatServiceImpl implements IAiragChatService { if (null == aiChatParams) { aiChatParams = new AIChatParams(); } + // 如果是默认app,加载系统默认工具 + if(chatConversation.getApp().getId().equals(AiAppConsts.DEFAULT_APP_ID)){ + aiChatParams.setTools(jeecgToolsProvider.getDefaultTools()); + } aiChatParams.setKnowIds(chatConversation.getApp().getKnowIds()); aiChatParams.setMaxMsgNumber(oConvertUtils.getInt(chatConversation.getApp().getMsgNum(), 5)); HttpServletRequest httpRequest = SpringContextUtils.getHttpServletRequest(); @@ -739,6 +840,19 @@ public class AiragChatServiceImpl implements IAiragChatService { } } catch (Exception e) { log.error(e.getMessage(), e); + // sse + SseEmitter emitter = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId); + if (null == emitter) { + log.warn("[AI应用]接收LLM返回会话已关闭{}", requestId); + return; + } + String errMsg = "调用大模型接口失败,详情请查看后台日志。"; + if(e instanceof JeecgBootException){ + errMsg = e.getMessage(); + } + EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId); + eventData.setData(EventFlowData.builder().success(false).message(errMsg).build()); + closeSSE(emitter, eventData); throw new JeecgBootBizTipException("调用大模型接口失败:" + e.getMessage()); } /** @@ -808,7 +922,7 @@ public class AiragChatServiceImpl implements IAiragChatService { // 异常结束 log.error("调用模型异常:" + respText); if (respText.contains("insufficient Balance")) { - respText = "大预言模型账号余额不足!"; + respText = "大语言模型账号余额不足!"; } EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId); eventData.setData(EventFlowData.builder().success(false).message(respText).build()); @@ -837,6 +951,14 @@ public class AiragChatServiceImpl implements IAiragChatService { //update-end---author:chenrui ---date:20250425 for:[QQYUN-12203]AI 聊天,超时或者服务器报错,给个友好提示------------ } else { errMsg = "调用大模型接口失败,详情请查看后台日志。"; + // 根据常见异常关键字做细致翻译 + for (Map.Entry entry : AIChatHandler.MODEL_ERROR_MAP.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (error.getMessage().contains(key)) { + errMsg = value; + } + } EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId); eventData.setData(EventFlowData.builder().success(false).message(errMsg).build()); closeSSE(emitter, eventData); @@ -858,6 +980,12 @@ public class AiragChatServiceImpl implements IAiragChatService { String eventStr = JSONObject.toJSONString(eventData); log.debug("[AI应用]接收LLM返回消息:{}", eventStr); emitter.send(SseEmitter.event().data(eventStr)); + List historyMsg = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, eventData.getRequestId()); + if (null == historyMsg) { + historyMsg = new CopyOnWriteArrayList<>(); + AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, eventData.getRequestId(), historyMsg); + } + historyMsg.add(eventData); } catch (IOException e) { log.error("发送消息失败", e); } diff --git a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/controller/AiragModelController.java b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/controller/AiragModelController.java index 990d360ab..4db2aa2e2 100644 --- a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/controller/AiragModelController.java +++ b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/controller/AiragModelController.java @@ -15,6 +15,7 @@ import org.jeecg.common.system.base.controller.JeecgController; import org.jeecg.common.system.query.QueryGenerator; import org.jeecg.common.util.AssertUtils; import org.jeecg.common.util.TokenUtils; +import org.jeecg.common.util.oConvertUtils; import org.jeecg.config.mybatis.MybatisPlusSaasConfig; import org.jeecg.modules.airag.llm.consts.LLMConsts; import org.jeecg.modules.airag.llm.entity.AiragModel; @@ -77,6 +78,11 @@ public class AiragModelController extends JeecgController messages, AIChatParams params) { params = mergeParams(airagModel, params); - String resp = llmHandler.completions(messages, params); + String resp; + try { + resp = llmHandler.completions(messages, params); + } catch (Exception e) { + // langchain4j 异常友好提示 + String errMsg = "调用大模型接口失败,详情请查看后台日志。"; + if (oConvertUtils.isNotEmpty(e.getMessage())) { +// // 根据常见异常关键字做细致翻译 +// for (Map.Entry entry : MODEL_ERROR_MAP.entrySet()) { +// String key = entry.getKey(); +// String value = entry.getValue(); +// if (errMsg.contains(key)) { +// errMsg = value; +// } +// } + } + log.error("AI模型调用异常: {}", errMsg, e); + throw new JeecgBootException(errMsg); + } if (resp.contains("") && (null == params.getNoThinking() || params.getNoThinking())) { String[] thinkSplit = resp.split(""); @@ -151,6 +169,7 @@ public class AIChatHandler implements IAIChatHandler { AssertUtils.assertNotEmpty("请选择模型", modelId); AiragModel airagModel = airagModelMapper.getByIdIgnoreTenant(modelId); + AssertUtils.assertSame("模型未激活,请先在[AI模型配置]中[测试激活]模型", airagModel.getActivateFlag(), 1); return chat(airagModel, messages, params); } diff --git a/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/handler/JeecgToolsProvider.java b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/handler/JeecgToolsProvider.java new file mode 100644 index 000000000..9b73e5d26 --- /dev/null +++ b/jeecg-boot/jeecg-boot-module/jeecg-boot-module-airag/src/main/java/org/jeecg/modules/airag/llm/handler/JeecgToolsProvider.java @@ -0,0 +1,42 @@ +package org.jeecg.modules.airag.llm.handler; + +import dev.langchain4j.agent.tool.ToolSpecification; +import dev.langchain4j.service.tool.ToolExecutor; +import lombok.Getter; + +import java.util.Map; + +/** + * for [QQYUN-13565]【AI助手】新增创建用户和查询用户的工具扩展 + * @Description: jeecg llm工具提供者 + * @Author: chenrui + * @Date: 2025/8/26 18:06 + */ +public interface JeecgToolsProvider { + + /** + * 获取默认的工具列表 + * @return + * @author chenrui + * @date 2025/8/27 09:49 + */ + public Map getDefaultTools(); + + /** + * jeecgLlm工具类 + * @author chenrui + * @date 2025/8/27 09:49 + */ + @Getter + class JeecgLlmTools{ + ToolSpecification toolSpecification; + ToolExecutor toolExecutor; + + public JeecgLlmTools(ToolSpecification toolSpecification, ToolExecutor toolExecutor) { + this.toolSpecification = toolSpecification; + this.toolExecutor = toolExecutor; + } + + + } +}