Add missing CMK job

pull/6221/head
Qiuyi LI 2023-06-30 10:34:26 +02:00
parent a23a94f3ef
commit c61b5e373c
7 changed files with 232 additions and 0 deletions

View File

@ -0,0 +1,123 @@
package org.jeecg.modules.business.domain.job;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTraceData;
import org.jeecg.modules.business.domain.api.cmk.CMKRequest;
import org.jeecg.modules.business.domain.api.cmk.CMKResponse;
import org.jeecg.modules.business.service.IParcelService;
import org.jeecg.modules.business.service.IPlatformOrderService;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.sql.Date;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class CMKJob implements Job {
@Autowired
private IParcelService parcelService;
@Autowired
private IPlatformOrderService platformOrderService;
private static final Integer DEFAULT_NUMBER_OF_DAYS = 15;
private static final Integer DEFAULT_MAXIMUM_NUMBER_OF_PARCELS_PER_TRANSACTION = 800;
private static final List<String> DEFAULT_TRANSPORTERS = Arrays.asList("CMK-JJ-PH 法 美 德", "CMK-DB-PH6000", "CMK-DB-PH18000", "CMK-DB-PH10000");
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
LocalDate endDate = LocalDate.now();
LocalDate startDate = endDate.minusDays(DEFAULT_NUMBER_OF_DAYS);
List<String> transporters = DEFAULT_TRANSPORTERS;
boolean overrideRestriction = false;
ObjectMapper mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
JobDataMap jobDataMap = context.getMergedJobDataMap();
String parameter = ((String) jobDataMap.get("parameter"));
if (parameter != null) {
try {
JSONObject jsonObject = new JSONObject(parameter);
if (!jsonObject.isNull("startDate")) {
String startDateStr = jsonObject.getString("startDate");
startDate = LocalDate.parse(startDateStr);
}
if (!jsonObject.isNull("endDate")) {
String endDateStr = jsonObject.getString("endDate");
endDate = LocalDate.parse(endDateStr);
}
if (!jsonObject.isNull("transporters")) {
JSONArray transporterArray = jsonObject.getJSONArray("transporters");
List<String> transporterList = new ArrayList<>();
for (int i = 0; i < transporterArray.length(); i++) {
transporterList.add(transporterArray.getString(i));
}
transporters = transporterList;
}
if (!jsonObject.isNull("override")) {
overrideRestriction = jsonObject.getBoolean("override");
}
} catch (JSONException e) {
log.error("Error while parsing parameter as JSON, falling back to default parameters.");
}
}
if (!endDate.isAfter(startDate)) {
throw new RuntimeException("EndDate must be strictly greater than StartDate !");
} else if (endDate.minusDays(30).isAfter(startDate) && !overrideRestriction) {
throw new RuntimeException("No more than 30 days can separate startDate and endDate !");
}
log.info("Starting to retrieve parcel traces of {} from {} to {}", transporters, startDate, endDate);
List<String> billCodes = platformOrderService.fetchBillCodesOfParcelsWithoutTrace(
Date.valueOf(startDate), Date.valueOf(endDate), transporters);
log.info("{} parcels without trace in total", billCodes.size());
List<List<String>> billCodeLists = Lists.partition(billCodes, 10);
log.info("Requests will be divided in to {} parts", billCodeLists.size());
List<CMKParcelTraceData> parcelTraces = new ArrayList<>();
List<CMKRequest> cmkRequests = new ArrayList<>();
billCodeLists.forEach(billcodeList -> {
CMKRequest cmkRequest = new CMKRequest(billcodeList);
cmkRequests.add(cmkRequest);
});
List<Boolean> results = new ArrayList<>();
for (CMKRequest request : cmkRequests) {
boolean success = false;
HttpEntity entity = request.send().getEntity();
try {
// String of the response
String responseString = EntityUtils.toString(entity, "UTF-8");
CMKResponse cmkResponse = mapper.readValue(responseString, CMKResponse.class);
parcelTraces.addAll(cmkResponse.getParcelData());
success = true;
} catch (IOException e) {
log.error("Error while parsing response into String", e);
} finally {
results.add(success);
}
log.info("{} parcel added to the queue to be inserted into DB.", parcelTraces.size());
}
long nbSuccesses = results.stream().filter(b -> b).count();
log.info("{}/{} lots of 10 parcel traces have been retrieved.", nbSuccesses, cmkRequests.size());
List<List<CMKParcelTraceData>> parcelTraceList = Lists.partition(parcelTraces, DEFAULT_MAXIMUM_NUMBER_OF_PARCELS_PER_TRANSACTION);
for (List<CMKParcelTraceData> parcelTracesPerTransaction : parcelTraceList) {
parcelService.saveCMKParcelAndTraces(parcelTracesPerTransaction);
}
}
}

View File

@ -2,6 +2,7 @@ package org.jeecg.modules.business.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTraceData;
import org.jeecg.modules.business.domain.api.equick.EQuickResponse;
import org.jeecg.modules.business.domain.api.jt.JTParcelTrace;
import org.jeecg.modules.business.domain.api.yd.YDTraceData;
@ -26,5 +27,8 @@ public interface ParcelMapper extends BaseMapper<Parcel> {
void insertOrUpdateEQParcels(List<EQuickResponse> parcels);
void insertOrIgnoreYDParcels(List<YDTraceData> parcels);
void insertOrIgnoreCMKParcels(List<CMKParcelTraceData> parcels);
List<Parcel> fetchParcelsToArchive(@Param("trackingNumbers") List<String> trackingNumbers);
}

View File

@ -2,6 +2,7 @@ package org.jeecg.modules.business.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTrace;
import org.jeecg.modules.business.domain.api.equick.EQuickTraceData;
import org.jeecg.modules.business.domain.api.jt.JTParcelTraceDetail;
import org.jeecg.modules.business.domain.api.yd.YDTraceDetail;
@ -28,5 +29,8 @@ public interface ParcelTraceMapper extends BaseMapper<ParcelTrace> {
void insertOrUpdateEQTraces(@Param("traces") List<EQuickTraceData> traceDetails);
void insertOrIgnoreYDTraces(@Param("traces") List<YDTraceDetail> traceDetails);
void insertOrIgnoreCMKTraces(@Param("traces") List<CMKParcelTrace> traceDetails);
List<ParcelTrace> fetchParcelTracesToArchive(@Param("parcelIDs") List<String> parcelIDs);
}

View File

@ -75,4 +75,24 @@
#{number}
</foreach>;
</select>
<insert id="insertOrIgnoreCMKParcels" parameterType="list">
INSERT IGNORE INTO parcel(id, create_by, create_time, update_by, update_time, bill_code, country,
third_bill_code, order_no, product_code)
VALUES
<foreach collection="parcels" separator="," open="" close="" item="parcel" index="index">
(
#{parcel.id},
'cmk api',
NOW(),
'cmk api',
NOW(),
#{parcel.thirdBillCode},
#{parcel.country},
#{parcel.thirdBillCode},
#{parcel.detail.orderNo},
#{parcel.detail.productCode}
)
</foreach>
</insert>
</mapper>

View File

@ -80,6 +80,28 @@
)
</foreach>
</insert>
<insert id="insertOrIgnoreCMKTraces" parameterType="list">
INSERT IGNORE INTO parcel_trace(id, create_by, create_time, update_by, update_time, parcel_id, scan_time,
description, description_en, trace_location, scan_type)
VALUES
<foreach collection="traces" separator="," open="" close="" item="trace" index="index">
(
UUID(),
'cmk api',
NOW(),
'cmk api',
NOW(),
#{trace.parcelId},
#{trace.time},
#{trace.description},
#{trace.descriptionEn},
#{trace.location},
#{trace.scanType}
)
</foreach>
</insert>
<select id="fetchParcelTracesToArchive" resultType="org.jeecg.modules.business.entity.Parcel">
SELECT *
FROM parcel_trace

View File

@ -1,6 +1,7 @@
package org.jeecg.modules.business.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTraceData;
import org.jeecg.modules.business.domain.api.equick.EQuickResponse;
import org.jeecg.modules.business.domain.api.jt.JTParcelTrace;
import org.jeecg.modules.business.domain.api.yd.YDTraceData;
@ -46,5 +47,8 @@ public interface IParcelService extends IService<Parcel> {
void saveEQParcelAndTraces(List<EQuickResponse> parcelTraces);
void saveYDParcelAndTraces(List<YDTraceData> traceData);
void saveCMKParcelAndTraces(List<CMKParcelTraceData> traceData);
List<Parcel> fetchParcelsToArchive(List<String> trackingNumbers);
}

View File

@ -2,14 +2,19 @@ package org.jeecg.modules.business.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelDetail;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTrace;
import org.jeecg.modules.business.domain.api.cmk.CMKParcelTraceData;
import org.jeecg.modules.business.domain.api.equick.EQuickResponse;
import org.jeecg.modules.business.domain.api.equick.EQuickTraceData;
import org.jeecg.modules.business.domain.api.jt.JTParcelTrace;
import org.jeecg.modules.business.domain.api.jt.JTParcelTraceDetail;
import org.jeecg.modules.business.domain.api.yd.YDTraceData;
import org.jeecg.modules.business.domain.api.yd.YDTraceDetail;
import org.jeecg.modules.business.entity.Country;
import org.jeecg.modules.business.entity.Parcel;
import org.jeecg.modules.business.entity.ParcelTrace;
import org.jeecg.modules.business.mapper.CountryMapper;
import org.jeecg.modules.business.mapper.ParcelMapper;
import org.jeecg.modules.business.mapper.ParcelTraceMapper;
import org.jeecg.modules.business.service.IParcelService;
@ -36,6 +41,8 @@ public class ParcelServiceImpl extends ServiceImpl<ParcelMapper, Parcel> impleme
private ParcelMapper parcelMapper;
@Autowired
private ParcelTraceMapper parcelTraceMapper;
@Autowired
private CountryMapper countryMapper;
@Override
@Transactional
@ -218,6 +225,54 @@ public class ParcelServiceImpl extends ServiceImpl<ParcelMapper, Parcel> impleme
}
log.info("Finished inserting {} parcels and their traces into DB.", parcelTraces.size());
}
@Override
@Transactional
public void saveCMKParcelAndTraces(List<CMKParcelTraceData> parcelTraces) {
if (parcelTraces.isEmpty()) {
return;
}
log.info("Started inserting {} CMK parcels and their traces into DB.", parcelTraces.size() );
List<String> parcelBillCodes = parcelTraces.stream()
.map(CMKParcelTraceData::getThirdBillCode)
.collect(Collectors.toList());
List<Parcel> existingParcels = parcelMapper.searchByBillCode(parcelBillCodes);
Map<String, Parcel> billCodeToExistingParcels = existingParcels.stream().collect(
Collectors.toMap(Parcel::getBillCode, Function.identity())
);
List<Country> countryList = countryMapper.findAll();
Map<String, String> countryNameMap = new HashMap<>();
countryList.forEach(country -> countryNameMap.put(country.getNameZh(), country.getCode()));
List<CMKParcelTraceData> parcelToInsert = new ArrayList<>();
List<CMKParcelTrace> tracesToInsert = new ArrayList<>();
for (CMKParcelTraceData parcelAndTrace : parcelTraces) {
List<CMKParcelTrace> traceDetails = parcelAndTrace.getTraceList();
CMKParcelDetail parcelDetail = parcelAndTrace.getDetail();
if (traceDetails.isEmpty()) {
break;
}
// Country name in detail is in Chinese, have to be converted to country code
parcelAndTrace.setCountry(countryNameMap.get(parcelDetail.getCountry()));
Parcel existingParcel = billCodeToExistingParcels.get(parcelAndTrace.getThirdBillCode());
if (existingParcel == null) {
parcelToInsert.add(parcelAndTrace);
traceDetails.forEach(trace -> trace.parcelTraceProcess(parcelAndTrace.getId()));
} else {
traceDetails.forEach(trace -> trace.parcelTraceProcess(existingParcel.getId()));
}
tracesToInsert.addAll(new ArrayList<>(traceDetails).stream().filter(CMKParcelTrace::isUseful).collect(Collectors.toList()));
}
log.info("After filtering, {} parcels will be inserted into the DB.", parcelToInsert.size());
if (!parcelToInsert.isEmpty()) {
parcelMapper.insertOrIgnoreCMKParcels(parcelToInsert);
}
if (!tracesToInsert.isEmpty()) {
parcelTraceMapper.insertOrIgnoreCMKTraces(tracesToInsert);
}
log.info("Finished inserting {} parcels and their traces into DB.", parcelTraces.size());
}
public List<Parcel> fetchParcelsToArchive(List<String> trackingNumbers) {
return parcelMapper.fetchParcelsToArchive(trackingNumbers);
}