From b347b0c2e71ebea371ae8465cd036e6179c0bbd0 Mon Sep 17 00:00:00 2001 From: Xue YANG Date: Fri, 2 May 2025 10:00:12 +0200 Subject: [PATCH] feat:Batch Edit Weight with WebSocket --- .../controller/admin/SkuWeightController.java | 26 +++++- .../impl/SkuListMabangServiceImpl.java | 87 +++++++++++++++---- .../message/websocket/WebSocketSender.java | 35 ++++++++ 3 files changed, 129 insertions(+), 19 deletions(-) create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocketSender.java diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/controller/admin/SkuWeightController.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/controller/admin/SkuWeightController.java index 4f7ade778..08861b12f 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/controller/admin/SkuWeightController.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/controller/admin/SkuWeightController.java @@ -5,6 +5,8 @@ import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -71,6 +73,7 @@ public class SkuWeightController extends JeecgController skuWeights = new ArrayList<>(skuWeightsMap.values()); + ExecutorService executor = Executors.newFixedThreadPool(DEFAULT_NUMBER_OF_THREADS); + executor.submit(() -> { + try { + ResponsesWithMsg responses = skuListMabangService.mabangSkuWeightUpdate(skuWeights); + List skuWeightSuccesses = new ArrayList<>(); + responses.getSuccesses().forEach((skuErpCode, messages) -> { + skuWeightSuccesses.add(skuWeightsMap.get(skuErpCode)); + }); + // Update Mongo + save to database + skuWeightSuccesses.forEach(skuWeight -> skuMongoService.upsertSkuWeight(skuWeight)); + skuWeightService.saveBatch(skuWeights); + } catch (Exception e) { + log.error("Asynchronous updateBatch processing failed", e); + } + }); + + return Result.OK("Batch Edit task has been submitted, please check progress later"); + /////////////////////////////////////////////////// + /** + List skuWeights = new ArrayList<>(skuWeightsMap.values()); ResponsesWithMsg responses = skuListMabangService.mabangSkuWeightUpdate(skuWeights); List skuWeightSuccesses = new ArrayList<>(); responses.getSuccesses().forEach((skuErpCode, messages) -> { @@ -354,6 +378,6 @@ public class SkuWeightController extends JeecgController skuMongoService.upsertSkuWeight(skuWeight)); skuWeightService.saveBatch(skuWeights); - return Result.OK(responses); + return Result.OK(responses);*/ } } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/service/impl/SkuListMabangServiceImpl.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/service/impl/SkuListMabangServiceImpl.java index b219dd653..d223dac24 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/service/impl/SkuListMabangServiceImpl.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/service/impl/SkuListMabangServiceImpl.java @@ -1,9 +1,13 @@ package org.jeecg.modules.business.service.impl; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.collect.Lists; +import com.jeecg.qywx.api.conversation.ConversationAPI; import freemarker.template.Template; import lombok.extern.slf4j.Slf4j; +import org.apache.shiro.SecurityUtils; +import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.business.domain.api.mabang.Response; import org.jeecg.modules.business.domain.api.mabang.doSearchSkuListNew.*; @@ -24,6 +28,7 @@ import org.jeecg.modules.business.mongoService.SkuMongoService; import org.jeecg.modules.business.service.*; import org.jeecg.modules.business.vo.ResponsesWithMsg; import org.jeecg.modules.business.vo.SkuOrderPage; +import org.jeecg.modules.message.websocket.WebSocketSender; import org.jeecg.modules.online.cgform.mapper.OnlCgformFieldMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; @@ -42,6 +47,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -76,9 +82,10 @@ public class SkuListMabangServiceImpl extends ServiceImpl mabangSkuWeightUpdate(List skuWeights) { ResponsesWithMsg responses = new ResponsesWithMsg<>(); + String userId = ((LoginUser) SecurityUtils.getSubject().getPrincipal()).getId(); List skuIds = skuWeights.stream() .map(SkuWeight::getSkuId) .collect(toList()); @@ -778,33 +786,76 @@ public class SkuListMabangServiceImpl extends ServiceImpl> futures = skuDataList.stream() - .map(skuData -> CompletableFuture.supplyAsync(() -> { + // Batch processing + WebSocket push + ExecutorService executor = ThrottlingExecutorService + .createExecutorService(DEFAULT_NUMBER_OF_THREADS, MABANG_API_RATE_LIMIT_PER_MINUTE, TimeUnit.MINUTES); + + List> futures = new ArrayList<>(); + // Used to count the number of processed data + AtomicInteger processedCounter = new AtomicInteger(0); + // Total number of data + int totalCount = skuDataList.size(); + List> batchList = Lists.partition(skuDataList, 10); + + // Define how many data to process before pushing progress once, at least once + int step = Math.max(1, totalCount / 10); + log.info("Total count: {}, push progress every {} items", totalCount, step); + + for (List batch : batchList) { + for (SkuData skuData : batch) { + futures.add(CompletableFuture.runAsync(() -> { try { SkuChangeRequestBody body = new SkuChangeRequestBody(skuData); SkuChangeRequest request = new SkuChangeRequest(body); - return request.send(); + SkuChangeResponse response = request.send(); + + if(response.success()) { + responses.addSuccess(response.getStockSku(), "Mabang"); + } + else { + responses.addFailure(response.getStockSku(), "Mabang"); + } } catch (Exception e) { log.error("Error updating weight for sku {} : {}", skuData.getErpCode(), e.getMessage()); - return new SkuChangeResponse(Response.Code.ERROR, null, null, skuData.getErpCode()); + responses.addFailure(skuData.getErpCode(), "Exception: " + e.getMessage()); + } finally { + int currentProcessed = processedCounter.incrementAndGet(); + // Only push progress if processed enough items (step) or finished all + if (currentProcessed % step == 0 || currentProcessed == totalCount) { + JSONObject progressMsg = new JSONObject(); + progressMsg.put("cmd", "user"); + progressMsg.put("msgTxt", "已处理 " + currentProcessed + " / " + totalCount + " 条 SKU 更新任务"); + WebSocketSender.sendToUser(userId, progressMsg.toJSONString()); + } } - }, executor)) - .collect(toList()); - List results = futures.stream().map(CompletableFuture::join).collect(toList()); - long successCount = results.stream().filter(SkuChangeResponse::success).count(); - log.info("{}/{} skus updated successfully.", successCount, skuDataList.size()); - results.forEach(response -> { - if(response.success()) { - responses.addSuccess(response.getStockSku(), "Mabang"); + }, executor)); } - else { - responses.addFailure(response.getStockSku(), "Mabang"); - } - }); + } + // Wait for all batches to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + // After all tasks are completed, calculate success and failure counts and push completion message + int successCount = responses.getSuccesses().size(); + int failureCount = responses.getFailures().size(); + + JSONObject doneMsg = new JSONObject(); + doneMsg.put("cmd", "user"); + doneMsg.put("msgTxt", "SKU 重量更新全部完成!"); + doneMsg.put("data", new JSONObject() {{ + put("total", skuDataList.size()); + put("success", successCount); + put("failure", failureCount); + }}); + + // Send the final result to frontend via WebSocket + WebSocketSender.sendToUser(userId, doneMsg.toJSONString()); + + log.info("SKU Weight update completed: total {} items processed, {} succeeded, {} failed", skuDataList.size(), successCount, failureCount); return responses; } + + @Override public List fetchUnpairedSkus(List stockSkuList) { List> skusPartition = Lists.partition(new ArrayList<>(stockSkuList), 50); diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocketSender.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocketSender.java new file mode 100644 index 000000000..84c7d5ef2 --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/message/websocket/WebSocketSender.java @@ -0,0 +1,35 @@ +package org.jeecg.modules.message.websocket; + +import com.alibaba.fastjson.JSONObject; +import org.jeecg.common.base.BaseMap; +import org.jeecg.common.modules.redis.client.JeecgRedisClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * WebSocket 发送器工具类(使用 redis 推送) + */ +@Component +public class WebSocketSender { + + private static JeecgRedisClient jeecgRedisClient; + + @Autowired + public void setRedisClient(JeecgRedisClient client) { + WebSocketSender.jeecgRedisClient = client; + } + + public static void sendToUser(String userId, String message) { + BaseMap baseMap = new BaseMap(); + baseMap.put("userId", userId); + baseMap.put("message", message); + jeecgRedisClient.sendMessage(WebSocket.REDIS_TOPIC_NAME, baseMap); + } + + public static void sendJsonToUser(String userId, String cmd, String msgTxt) { + JSONObject json = new JSONObject(); + json.put("cmd", cmd); + json.put("msgTxt", msgTxt); + sendToUser(userId, json.toJSONString()); + } +}