feat:Batch Edit Weight with WebSocket

pull/8523/head
Xue YANG 2025-05-02 10:00:12 +02:00
parent 55eea18803
commit b347b0c2e7
3 changed files with 129 additions and 19 deletions

View File

@ -5,6 +5,8 @@ import java.io.InputStream;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -71,6 +73,7 @@ public class SkuWeightController extends JeecgController<SkuWeight, ISkuWeightSe
@Resource @Resource
private JeecgBaseConfig jeecgBaseConfig; private JeecgBaseConfig jeecgBaseConfig;
private static final Integer DEFAULT_NUMBER_OF_THREADS = 1;
private final static Integer NUMBER_OF_SKU_EXCEL_COLUMNS = 3; private final static Integer NUMBER_OF_SKU_EXCEL_COLUMNS = 3;
private final DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private final DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -345,6 +348,27 @@ public class SkuWeightController extends JeecgController<SkuWeight, ISkuWeightSe
skuWeight.setWeight(param.getWeight()); skuWeight.setWeight(param.getWeight());
skuWeightsMap.put(sku.getErpCode(), skuWeight); skuWeightsMap.put(sku.getErpCode(), skuWeight);
} }
// Submit task asynchronously
List<SkuWeight> skuWeights = new ArrayList<>(skuWeightsMap.values());
ExecutorService executor = Executors.newFixedThreadPool(DEFAULT_NUMBER_OF_THREADS);
executor.submit(() -> {
try {
ResponsesWithMsg<String> responses = skuListMabangService.mabangSkuWeightUpdate(skuWeights);
List<SkuWeight> skuWeightSuccesses = new ArrayList<>();
responses.getSuccesses().forEach((skuErpCode, messages) -> {
skuWeightSuccesses.add(skuWeightsMap.get(skuErpCode));
});
// Update Mongo + save to database
skuWeightSuccesses.forEach(skuWeight -> skuMongoService.upsertSkuWeight(skuWeight));
skuWeightService.saveBatch(skuWeights);
} catch (Exception e) {
log.error("Asynchronous updateBatch processing failed", e);
}
});
return Result.OK("Batch Edit task has been submitted, please check progress later");
///////////////////////////////////////////////////
/**
List<SkuWeight> skuWeights = new ArrayList<>(skuWeightsMap.values()); List<SkuWeight> skuWeights = new ArrayList<>(skuWeightsMap.values());
ResponsesWithMsg<String> responses = skuListMabangService.mabangSkuWeightUpdate(skuWeights); ResponsesWithMsg<String> responses = skuListMabangService.mabangSkuWeightUpdate(skuWeights);
List<SkuWeight> skuWeightSuccesses = new ArrayList<>(); List<SkuWeight> skuWeightSuccesses = new ArrayList<>();
@ -354,6 +378,6 @@ public class SkuWeightController extends JeecgController<SkuWeight, ISkuWeightSe
skuWeightSuccesses.forEach(skuWeight -> skuMongoService.upsertSkuWeight(skuWeight)); skuWeightSuccesses.forEach(skuWeight -> skuMongoService.upsertSkuWeight(skuWeight));
skuWeightService.saveBatch(skuWeights); skuWeightService.saveBatch(skuWeights);
return Result.OK(responses); return Result.OK(responses);*/
} }
} }

View File

@ -1,9 +1,13 @@
package org.jeecg.modules.business.service.impl; package org.jeecg.modules.business.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.jeecg.qywx.api.conversation.ConversationAPI;
import freemarker.template.Template; import freemarker.template.Template;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.SpringContextUtils; import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.business.domain.api.mabang.Response; import org.jeecg.modules.business.domain.api.mabang.Response;
import org.jeecg.modules.business.domain.api.mabang.doSearchSkuListNew.*; import org.jeecg.modules.business.domain.api.mabang.doSearchSkuListNew.*;
@ -24,6 +28,7 @@ import org.jeecg.modules.business.mongoService.SkuMongoService;
import org.jeecg.modules.business.service.*; import org.jeecg.modules.business.service.*;
import org.jeecg.modules.business.vo.ResponsesWithMsg; import org.jeecg.modules.business.vo.ResponsesWithMsg;
import org.jeecg.modules.business.vo.SkuOrderPage; import org.jeecg.modules.business.vo.SkuOrderPage;
import org.jeecg.modules.message.websocket.WebSocketSender;
import org.jeecg.modules.online.cgform.mapper.OnlCgformFieldMapper; import org.jeecg.modules.online.cgform.mapper.OnlCgformFieldMapper;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
@ -42,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -76,9 +82,10 @@ public class SkuListMabangServiceImpl extends ServiceImpl<SkuListMabangMapper, S
@Autowired @Autowired
Environment env; Environment env;
private static final Integer DEFAULT_NUMBER_OF_THREADS = 10; private static final Integer DEFAULT_NUMBER_OF_THREADS = 1;
private static final Integer MABANG_API_RATE_LIMIT_PER_MINUTE = 10; private static final Integer MABANG_API_RATE_LIMIT_PER_MINUTE = 10;
private final static String DEFAULT_WAREHOUSE_NAME = "SZBA宝安仓"; private final static String DEFAULT_WAREHOUSE_NAME = "SZBA宝安仓";
// In NameCN field on top of the product name we also get the customer code in the beginning of the string : "XX Description of the product" // In NameCN field on top of the product name we also get the customer code in the beginning of the string : "XX Description of the product"
@ -687,6 +694,7 @@ public class SkuListMabangServiceImpl extends ServiceImpl<SkuListMabangMapper, S
@Override @Override
public ResponsesWithMsg<String> mabangSkuWeightUpdate(List<SkuWeight> skuWeights) { public ResponsesWithMsg<String> mabangSkuWeightUpdate(List<SkuWeight> skuWeights) {
ResponsesWithMsg<String> responses = new ResponsesWithMsg<>(); ResponsesWithMsg<String> responses = new ResponsesWithMsg<>();
String userId = ((LoginUser) SecurityUtils.getSubject().getPrincipal()).getId();
List<String> skuIds = skuWeights.stream() List<String> skuIds = skuWeights.stream()
.map(SkuWeight::getSkuId) .map(SkuWeight::getSkuId)
.collect(toList()); .collect(toList());
@ -778,33 +786,76 @@ public class SkuListMabangServiceImpl extends ServiceImpl<SkuListMabangMapper, S
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(toList()); .collect(toList());
ExecutorService executor = ThrottlingExecutorService.createExecutorService(DEFAULT_NUMBER_OF_THREADS, MABANG_API_RATE_LIMIT_PER_MINUTE, TimeUnit.MINUTES); // Batch processing + WebSocket push
List<CompletableFuture<SkuChangeResponse>> futures = skuDataList.stream() ExecutorService executor = ThrottlingExecutorService
.map(skuData -> CompletableFuture.supplyAsync(() -> { .createExecutorService(DEFAULT_NUMBER_OF_THREADS, MABANG_API_RATE_LIMIT_PER_MINUTE, TimeUnit.MINUTES);
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Used to count the number of processed data
AtomicInteger processedCounter = new AtomicInteger(0);
// Total number of data
int totalCount = skuDataList.size();
List<List<SkuData>> batchList = Lists.partition(skuDataList, 10);
// Define how many data to process before pushing progress once, at least once
int step = Math.max(1, totalCount / 10);
log.info("Total count: {}, push progress every {} items", totalCount, step);
for (List<SkuData> batch : batchList) {
for (SkuData skuData : batch) {
futures.add(CompletableFuture.runAsync(() -> {
try { try {
SkuChangeRequestBody body = new SkuChangeRequestBody(skuData); SkuChangeRequestBody body = new SkuChangeRequestBody(skuData);
SkuChangeRequest request = new SkuChangeRequest(body); SkuChangeRequest request = new SkuChangeRequest(body);
return request.send(); SkuChangeResponse response = request.send();
} catch (Exception e) {
log.error("Error updating weight for sku {} : {}", skuData.getErpCode(), e.getMessage());
return new SkuChangeResponse(Response.Code.ERROR, null, null, skuData.getErpCode());
}
}, executor))
.collect(toList());
List<SkuChangeResponse> results = futures.stream().map(CompletableFuture::join).collect(toList());
long successCount = results.stream().filter(SkuChangeResponse::success).count();
log.info("{}/{} skus updated successfully.", successCount, skuDataList.size());
results.forEach(response -> {
if(response.success()) { if(response.success()) {
responses.addSuccess(response.getStockSku(), "Mabang"); responses.addSuccess(response.getStockSku(), "Mabang");
} }
else { else {
responses.addFailure(response.getStockSku(), "Mabang"); responses.addFailure(response.getStockSku(), "Mabang");
} }
}); } catch (Exception e) {
log.error("Error updating weight for sku {} : {}", skuData.getErpCode(), e.getMessage());
responses.addFailure(skuData.getErpCode(), "Exception: " + e.getMessage());
} finally {
int currentProcessed = processedCounter.incrementAndGet();
// Only push progress if processed enough items (step) or finished all
if (currentProcessed % step == 0 || currentProcessed == totalCount) {
JSONObject progressMsg = new JSONObject();
progressMsg.put("cmd", "user");
progressMsg.put("msgTxt", "已处理 " + currentProcessed + " / " + totalCount + " 条 SKU 更新任务");
WebSocketSender.sendToUser(userId, progressMsg.toJSONString());
}
}
}, executor));
}
}
// Wait for all batches to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// After all tasks are completed, calculate success and failure counts and push completion message
int successCount = responses.getSuccesses().size();
int failureCount = responses.getFailures().size();
JSONObject doneMsg = new JSONObject();
doneMsg.put("cmd", "user");
doneMsg.put("msgTxt", "SKU 重量更新全部完成!");
doneMsg.put("data", new JSONObject() {{
put("total", skuDataList.size());
put("success", successCount);
put("failure", failureCount);
}});
// Send the final result to frontend via WebSocket
WebSocketSender.sendToUser(userId, doneMsg.toJSONString());
log.info("SKU Weight update completed: total {} items processed, {} succeeded, {} failed", skuDataList.size(), successCount, failureCount);
return responses; return responses;
} }
@Override @Override
public List<SkuData> fetchUnpairedSkus(List<String> stockSkuList) { public List<SkuData> fetchUnpairedSkus(List<String> stockSkuList) {
List<List<String>> skusPartition = Lists.partition(new ArrayList<>(stockSkuList), 50); List<List<String>> skusPartition = Lists.partition(new ArrayList<>(stockSkuList), 50);

View File

@ -0,0 +1,35 @@
package org.jeecg.modules.message.websocket;
import com.alibaba.fastjson.JSONObject;
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* WebSocket 使 redis
*/
@Component
public class WebSocketSender {
private static JeecgRedisClient jeecgRedisClient;
@Autowired
public void setRedisClient(JeecgRedisClient client) {
WebSocketSender.jeecgRedisClient = client;
}
public static void sendToUser(String userId, String message) {
BaseMap baseMap = new BaseMap();
baseMap.put("userId", userId);
baseMap.put("message", message);
jeecgRedisClient.sendMessage(WebSocket.REDIS_TOPIC_NAME, baseMap);
}
public static void sendJsonToUser(String userId, String cmd, String msgTxt) {
JSONObject json = new JSONObject();
json.put("cmd", cmd);
json.put("msgTxt", msgTxt);
sendToUser(userId, json.toJSONString());
}
}