优化AIRG

pull/8786/merge
JEECG 2025-09-13 16:15:23 +08:00
parent fd60e49f5b
commit 73059b8a53
7 changed files with 251 additions and 23 deletions

View File

@ -156,6 +156,20 @@ public class AiragChatController {
return chatService.clearMessage(conversationId);
}
/**
*
*
* @param requestId
* @return
* @author chenrui
* @date 2025/8/11 17:49
*/
@IgnoreAuth
@GetMapping(value = "/receive/{requestId}")
public SseEmitter receiveByRequestId(@PathVariable(name = "requestId", required = true) String requestId) {
return chatService.receiveByRequestId(requestId);
}
/**
* ID

View File

@ -102,4 +102,13 @@ public interface IAiragChatService {
* @date 2025/4/21 14:17
*/
Result<?> initChat(String appId);
/**
*
* @param requestId
* @return
* @author chenrui
* @date 2025/8/11 17:39
*/
SseEmitter receiveByRequestId(String requestId);
}

View File

@ -8,13 +8,13 @@ import dev.langchain4j.service.TokenStream;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.exception.JeecgBootBizTipException;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.system.api.ISysBaseAPI;
import org.jeecg.common.system.util.JwtUtil;
import org.jeecg.common.util.*;
import org.jeecg.modules.airag.app.consts.AiAppConsts;
import org.jeecg.modules.airag.app.entity.AiragApp;
import org.jeecg.modules.airag.app.mapper.AiragAppMapper;
import org.jeecg.modules.airag.app.service.IAiragAppService;
import org.jeecg.modules.airag.app.service.IAiragChatService;
import org.jeecg.modules.airag.app.vo.AppDebugParams;
import org.jeecg.modules.airag.app.vo.ChatConversation;
@ -31,6 +31,8 @@ import org.jeecg.modules.airag.flow.consts.FlowConsts;
import org.jeecg.modules.airag.flow.service.IAiragFlowService;
import org.jeecg.modules.airag.flow.vo.api.FlowRunParams;
import org.jeecg.modules.airag.llm.entity.AiragModel;
import org.jeecg.modules.airag.llm.handler.AIChatHandler;
import org.jeecg.modules.airag.llm.handler.JeecgToolsProvider;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundValueOperations;
@ -41,8 +43,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -74,6 +75,14 @@ public class AiragChatServiceImpl implements IAiragChatService {
@Autowired
private RedisUtil redisUtil;
@Autowired
JeecgToolsProvider jeecgToolsProvider;
/**
*
*/
private static final ExecutorService SSE_THREAD_POOL = Executors.newFixedThreadPool(10); // 最大10个线程
@Override
public SseEmitter send(ChatSendParams chatSendParams) {
AssertUtils.assertNotEmpty("参数异常", chatSendParams);
@ -148,7 +157,9 @@ public class AiragChatServiceImpl implements IAiragChatService {
// 发送完成事件
emitter.send(SseEmitter.event().data(eventData));
} catch (Exception e) {
log.error("终止会话时发生错误", e);
if(!e.getMessage().contains("ResponseBodyEmitter has already completed")){
log.error("终止会话时发生错误", e);
}
try {
// 防止异常冒泡
emitter.completeWithError(e);
@ -250,6 +261,96 @@ public class AiragChatServiceImpl implements IAiragChatService {
return Result.ok(app);
}
@Override
public SseEmitter receiveByRequestId(String requestId) {
AssertUtils.assertNotEmpty("请选择会话",requestId);
if(AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId) == null){
return null;
}
List<EventData> datas = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, requestId);
if(null == datas){
return null;
}
SseEmitter emitter = createSSE(requestId);
// 120秒
final long timeoutMillis = 120_000L;
// 使用线程池提交任务
SSE_THREAD_POOL.submit(() -> {
int lastIndex = 0;
long lastActiveTime = System.currentTimeMillis();
try {
while (true) {
if(lastIndex < datas.size()) {
try {
EventData eventData = datas.get(lastIndex++);
String eventStr = JSONObject.toJSONString(eventData);
log.debug("[AI应用]继续接收-接收LLM返回消息:{}", eventStr);
emitter.send(SseEmitter.event().data(eventStr));
// 有新消息,重置计时
lastActiveTime = System.currentTimeMillis();
} catch (IOException e) {
log.error("[AI应用]继续接收-发送消息失败");
}
} else {
// 没有新消息了
if (AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId) == null) {
// 主线程sse已经被移除,退出线程.
log.info("[AI应用]继续接收-SSE消息推送完成: {}", requestId);
break;
} else if (System.currentTimeMillis() - lastActiveTime > timeoutMillis) {
// 主线程未结束,等待超时,
log.warn("[AI应用]继续接收-等待消息更新超时,释放线程: {}", requestId);
break;
} else {
// 主线程未结束, 未超时, 休眠一会再查
log.warn("[AI应用]继续接收-等待消息更新: {}", requestId);
Thread.sleep(500);
}
}
}
} catch (Exception e) {
log.error("SSE消息推送异常", e);
} finally {
try {
// 发送完成事件
emitter.send(SseEmitter.event().data(new EventData(requestId, null, EventData.EVENT_MESSAGE_END)));
} catch (Exception e) {
log.error("终止会话时发生错误", e);
try {
// 防止异常冒泡
emitter.completeWithError(e);
} catch (Exception ignore) {}
} finally {
// 关闭emitter
try {
emitter.complete();
} catch (Exception ignore) {}
}
}
});
return emitter;
}
/**
* SSE
* @param requestId
* @return
* @author chenrui
* @date 2025/8/12 15:30
*/
private static SseEmitter createSSE(String requestId) {
SseEmitter emitter = new SseEmitter(-0L);
emitter.onError(throwable -> {
log.warn("SEE向客户端发送消息失败: {}", throwable.getMessage());
AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE, requestId);
AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE_SEND_TIME, requestId);
try {
emitter.complete();
} catch (Exception ignore) {}
});
return emitter;
}
@Override
public Result<?> deleteConversation(String conversationId) {
AssertUtils.assertNotEmpty("请选择要删除的会话", conversationId);
@ -522,22 +623,14 @@ public class AiragChatServiceImpl implements IAiragChatService {
AiragApp aiApp = chatConversation.getApp();
// 每次会话都生成一个新的,用来缓存emitter
String requestId = UUIDGenerator.generate();
SseEmitter emitter = new SseEmitter(-0L);
emitter.onError(throwable -> {
log.warn("SEE向客户端发送消息失败: {}", throwable.getMessage());
AiragLocalCache.remove(AiragConsts.CACHE_TYPE_SSE, requestId);
try {
emitter.complete();
} catch (Exception ignore) {}
});
EventData eventRequestId = new EventData(requestId, null, EventData.EVENT_INIT_REQUEST_ID, chatConversation.getId(), topicId);
eventRequestId.setData(EventMessageData.builder().message("").build());
sendMessage2Client(emitter, eventRequestId);
SseEmitter emitter = createSSE(requestId);
// 缓存emitter
AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE, requestId, emitter);
// 缓存开始发送时间
log.info("[AI-CHAT]开始发送消息,requestId:{}", requestId);
AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_SEND_TIME, requestId, System.currentTimeMillis());
// 初始化历史消息缓存
AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, requestId, new CopyOnWriteArrayList<>());
try {
// 组装用户消息
UserMessage userMessage = aiChatHandler.buildUserMessage(sendParams.getContent(), sendParams.getImages());
@ -561,6 +654,10 @@ public class AiragChatServiceImpl implements IAiragChatService {
// 发消息
sendWithDefault(requestId, chatConversation, topicId, null, messages, null);
}
// 发送就绪消息
EventData eventRequestId = new EventData(requestId, null, EventData.EVENT_INIT_REQUEST_ID, chatConversation.getId(), topicId);
eventRequestId.setData(EventMessageData.builder().message("").build());
sendMessage2Client(emitter, eventRequestId);
} catch (Throwable e) {
log.error(e.getMessage(), e);
EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId);
@ -725,6 +822,10 @@ public class AiragChatServiceImpl implements IAiragChatService {
if (null == aiChatParams) {
aiChatParams = new AIChatParams();
}
// 如果是默认app,加载系统默认工具
if(chatConversation.getApp().getId().equals(AiAppConsts.DEFAULT_APP_ID)){
aiChatParams.setTools(jeecgToolsProvider.getDefaultTools());
}
aiChatParams.setKnowIds(chatConversation.getApp().getKnowIds());
aiChatParams.setMaxMsgNumber(oConvertUtils.getInt(chatConversation.getApp().getMsgNum(), 5));
HttpServletRequest httpRequest = SpringContextUtils.getHttpServletRequest();
@ -739,6 +840,19 @@ public class AiragChatServiceImpl implements IAiragChatService {
}
} catch (Exception e) {
log.error(e.getMessage(), e);
// sse
SseEmitter emitter = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE, requestId);
if (null == emitter) {
log.warn("[AI应用]接收LLM返回会话已关闭{}", requestId);
return;
}
String errMsg = "调用大模型接口失败,详情请查看后台日志。";
if(e instanceof JeecgBootException){
errMsg = e.getMessage();
}
EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId);
eventData.setData(EventFlowData.builder().success(false).message(errMsg).build());
closeSSE(emitter, eventData);
throw new JeecgBootBizTipException("调用大模型接口失败:" + e.getMessage());
}
/**
@ -808,7 +922,7 @@ public class AiragChatServiceImpl implements IAiragChatService {
// 异常结束
log.error("调用模型异常:" + respText);
if (respText.contains("insufficient Balance")) {
respText = "大言模型账号余额不足!";
respText = "大言模型账号余额不足!";
}
EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId);
eventData.setData(EventFlowData.builder().success(false).message(respText).build());
@ -837,6 +951,14 @@ public class AiragChatServiceImpl implements IAiragChatService {
//update-end---author:chenrui ---date:20250425 for[QQYUN-12203]AI 聊天,超时或者服务器报错,给个友好提示------------
} else {
errMsg = "调用大模型接口失败,详情请查看后台日志。";
// 根据常见异常关键字做细致翻译
for (Map.Entry<String, String> entry : AIChatHandler.MODEL_ERROR_MAP.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (error.getMessage().contains(key)) {
errMsg = value;
}
}
EventData eventData = new EventData(requestId, null, EventData.EVENT_FLOW_ERROR, chatConversation.getId(), topicId);
eventData.setData(EventFlowData.builder().success(false).message(errMsg).build());
closeSSE(emitter, eventData);
@ -858,6 +980,12 @@ public class AiragChatServiceImpl implements IAiragChatService {
String eventStr = JSONObject.toJSONString(eventData);
log.debug("[AI应用]接收LLM返回消息:{}", eventStr);
emitter.send(SseEmitter.event().data(eventStr));
List<EventData> historyMsg = AiragLocalCache.get(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, eventData.getRequestId());
if (null == historyMsg) {
historyMsg = new CopyOnWriteArrayList<>();
AiragLocalCache.put(AiragConsts.CACHE_TYPE_SSE_HISTORY_MSG, eventData.getRequestId(), historyMsg);
}
historyMsg.add(eventData);
} catch (IOException e) {
log.error("发送消息失败", e);
}

View File

@ -15,6 +15,7 @@ import org.jeecg.common.system.base.controller.JeecgController;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.common.util.AssertUtils;
import org.jeecg.common.util.TokenUtils;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.config.mybatis.MybatisPlusSaasConfig;
import org.jeecg.modules.airag.llm.consts.LLMConsts;
import org.jeecg.modules.airag.llm.entity.AiragModel;
@ -77,6 +78,11 @@ public class AiragModelController extends JeecgController<AiragModel, IAiragMode
AssertUtils.assertNotEmpty("模型名称不能为空", airagModel.getName());
AssertUtils.assertNotEmpty("模型类型不能为空", airagModel.getModelType());
AssertUtils.assertNotEmpty("基础模型不能为空", airagModel.getModelName());
// 默认未激活
if(oConvertUtils.isObjectEmpty(airagModel.getActivateFlag())){
airagModel.setActivateFlag(0);
}
airagModel.setActivateFlag(0);
airagModelService.save(airagModel);
return Result.OK("添加成功!");
}
@ -164,7 +170,7 @@ public class AiragModelController extends JeecgController<AiragModel, IAiragMode
AssertUtils.assertNotEmpty("基础模型不能为空", airagModel.getModelName());
try {
if(LLMConsts.MODEL_TYPE_LLM.equals(airagModel.getModelType())){
aiChatHandler.completions(airagModel, Collections.singletonList(UserMessage.from("test connection")), null);
aiChatHandler.completions(airagModel, Collections.singletonList(UserMessage.from("To test whether it can be successfully called, simply return success")), null);
}else{
AiModelOptions aiModelOptions = EmbeddingHandler.buildModelOptions(airagModel);
EmbeddingModel embeddingModel = AiModelFactory.createEmbeddingModel(aiModelOptions);
@ -172,9 +178,12 @@ public class AiragModelController extends JeecgController<AiragModel, IAiragMode
}
}catch (Exception e){
log.error("测试模型连接失败", e);
return Result.error("测试模型连接失败" + e.getMessage());
return Result.error("测试模型连接失败" + e.getMessage());
}
return Result.OK("测试模型连接成功");
// 测试成功激活数据
airagModel.setActivateFlag(1);
airagModelService.updateById(airagModel);
return Result.OK("");
}
}

View File

@ -121,4 +121,11 @@ public class AiragModel implements Serializable {
@Excel(name = "模型参数", width = 15)
@Schema(description = "模型参数")
private String modelParams;
/**
* (0=,1=)
*/
@Excel(name = "是否激活", width = 15)
@Schema(description = "是否激活")
private Integer activateFlag;
}

View File

@ -6,6 +6,7 @@ import dev.langchain4j.rag.query.router.QueryRouter;
import dev.langchain4j.service.TokenStream;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.ai.handler.LLMHandler;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.AssertUtils;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.airag.common.handler.AIChatParams;
@ -22,9 +23,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.*;
import java.util.regex.Matcher;
/**
@ -83,6 +82,7 @@ public class AIChatHandler implements IAIChatHandler {
AssertUtils.assertNotEmpty("请选择模型", modelId);
AiragModel airagModel = airagModelMapper.getByIdIgnoreTenant(modelId);
AssertUtils.assertSame("模型未激活,请先在[AI模型配置]中[测试激活]模型", airagModel.getActivateFlag(), 1);
return completions(airagModel, messages, params);
}
@ -98,7 +98,25 @@ public class AIChatHandler implements IAIChatHandler {
*/
public String completions(AiragModel airagModel, List<ChatMessage> messages, AIChatParams params) {
params = mergeParams(airagModel, params);
String resp = llmHandler.completions(messages, params);
String resp;
try {
resp = llmHandler.completions(messages, params);
} catch (Exception e) {
// langchain4j 异常友好提示
String errMsg = "调用大模型接口失败,详情请查看后台日志。";
if (oConvertUtils.isNotEmpty(e.getMessage())) {
// // 根据常见异常关键字做细致翻译
// for (Map.Entry<String, String> entry : MODEL_ERROR_MAP.entrySet()) {
// String key = entry.getKey();
// String value = entry.getValue();
// if (errMsg.contains(key)) {
// errMsg = value;
// }
// }
}
log.error("AI模型调用异常: {}", errMsg, e);
throw new JeecgBootException(errMsg);
}
if (resp.contains("</think>")
&& (null == params.getNoThinking() || params.getNoThinking())) {
String[] thinkSplit = resp.split("</think>");
@ -151,6 +169,7 @@ public class AIChatHandler implements IAIChatHandler {
AssertUtils.assertNotEmpty("请选择模型", modelId);
AiragModel airagModel = airagModelMapper.getByIdIgnoreTenant(modelId);
AssertUtils.assertSame("模型未激活,请先在[AI模型配置]中[测试激活]模型", airagModel.getActivateFlag(), 1);
return chat(airagModel, messages, params);
}

View File

@ -0,0 +1,42 @@
package org.jeecg.modules.airag.llm.handler;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.service.tool.ToolExecutor;
import lombok.Getter;
import java.util.Map;
/**
* for [QQYUN-13565]AI
* @Description: jeecg llm
* @Author: chenrui
* @Date: 2025/8/26 18:06
*/
public interface JeecgToolsProvider {
/**
*
* @return
* @author chenrui
* @date 2025/8/27 09:49
*/
public Map<ToolSpecification, ToolExecutor> getDefaultTools();
/**
* jeecgLlm
* @author chenrui
* @date 2025/8/27 09:49
*/
@Getter
class JeecgLlmTools{
ToolSpecification toolSpecification;
ToolExecutor toolExecutor;
public JeecgLlmTools(ToolSpecification toolSpecification, ToolExecutor toolExecutor) {
this.toolSpecification = toolSpecification;
this.toolExecutor = toolExecutor;
}
}
}