Apply a throttler of 40 requests per minute to ShopifySyncJob

pull/6221/head
Qiuyi LI 2024-03-05 15:53:11 +01:00
parent c39581ca61
commit 188c5601bd
1 changed files with 6 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -33,6 +34,7 @@ public class ShopifySyncJob implements Job {
private static final List<String> DEFAULT_INCLUDED_SHOPS = Arrays.asList("JCH3", "JCH4", "JCH5"); private static final List<String> DEFAULT_INCLUDED_SHOPS = Arrays.asList("JCH3", "JCH4", "JCH5");
private static final Integer DEFAULT_NUMBER_OF_THREADS = 10; private static final Integer DEFAULT_NUMBER_OF_THREADS = 10;
private static final Integer SHOPIFY_API_RATE_LIMIT_PER_SHOP_PER_MINUTE = 40;
@Autowired @Autowired
private IPlatformOrderService platformOrderService; private IPlatformOrderService platformOrderService;
@ -66,7 +68,8 @@ public class ShopifySyncJob implements Job {
List<FulfillmentOrder> fulfillmentOrders = new ArrayList<>(); List<FulfillmentOrder> fulfillmentOrders = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(DEFAULT_NUMBER_OF_THREADS); ExecutorService throttlingExecutorService = ThrottlingExecutorService.createExecutorService(DEFAULT_NUMBER_OF_THREADS,
SHOPIFY_API_RATE_LIMIT_PER_SHOP_PER_MINUTE, TimeUnit.MINUTES);
log.info("Constructing fulfillment retrieval requests"); log.info("Constructing fulfillment retrieval requests");
List<GetFulfillmentRequestBody> getFulfillmentRequestBodyList = new ArrayList<>(); List<GetFulfillmentRequestBody> getFulfillmentRequestBodyList = new ArrayList<>();
ordersReadyForShopifySync.forEach(o -> getFulfillmentRequestBodyList.add(new GetFulfillmentRequestBody(o))); ordersReadyForShopifySync.forEach(o -> getFulfillmentRequestBodyList.add(new GetFulfillmentRequestBody(o)));
@ -88,7 +91,7 @@ public class ShopifySyncJob implements Job {
log.error("Error processing json", e); log.error("Error processing json", e);
} }
return success; return success;
}, executor)) }, throttlingExecutorService))
.collect(Collectors.toList()); .collect(Collectors.toList());
List<Boolean> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); List<Boolean> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
long nbSuccesses = results.stream().filter(b -> b).count(); long nbSuccesses = results.stream().filter(b -> b).count();
@ -120,7 +123,7 @@ public class ShopifySyncJob implements Job {
log.error("Error processing json", e); log.error("Error processing json", e);
} }
return success; return success;
}, executor)) }, throttlingExecutorService))
.collect(Collectors.toList()); .collect(Collectors.toList());
results = fulfillmentCreationFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()); results = fulfillmentCreationFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
nbSuccesses = results.stream().filter(b -> b).count(); nbSuccesses = results.stream().filter(b -> b).count();