From bcf57915cc76f6c79e73968367acdb40d598d31a Mon Sep 17 00:00:00 2001 From: Qiuyi LI Date: Thu, 14 Sep 2023 16:12:40 +0200 Subject: [PATCH] Add thread executor service with throttling --- .../business/domain/job/IntervalWindow.java | 205 ++++++++++++++++++ .../domain/job/ThrottlingExecutorService.java | 166 ++++++++++++++ 2 files changed, 371 insertions(+) create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/IntervalWindow.java create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/ThrottlingExecutorService.java diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/IntervalWindow.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/IntervalWindow.java new file mode 100644 index 000000000..778e87ea7 --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/IntervalWindow.java @@ -0,0 +1,205 @@ +package org.jeecg.modules.business.domain.job; + +import java.util.LinkedHashMap; +import java.util.Map; +/** + * Extends {@code LinkedHashMap} into a fixed sized ordered cache + * for allocating and tracking limited resources in intervals. It also + * tracks the allocation rate as a moving average. + *

+ * The {@code IntervalWindow} is created with a cache that consists of the + * present interval and at least one past interval. + * As the number of cached intervals exceed the windows size, they are + * removed and the moving average updated. The allocation method is + * thread-safe to ensure over allocation is avoided. + * + * @author martinb + * @since 2015-08-17 + */ +public class IntervalWindow extends LinkedHashMap +{ + /** + * Serial + */ + private static final long serialVersionUID = 201508171315L; + + /** + * The upper execution limit per interval + */ + private final int INTERVAL_LIMIT; + + /** + * Number of intervals to track + */ + private final int INTERVAL_WINDOW_SIZE; + + /** + * The current interval being filled. + */ + private long currentInterval = 0; + + /** + * The moving total of slots used in the window + */ + private int slotsUsedInWindow = 0; + + /** + * The minimum interval index that can be considered. + */ + private long minimumInterval = 0; + + + /** + * Returns the value in the map, or a default. + * Implemented in JSE8 + * + * @param key + * @param defaultValue + * @return the value + */ + private final int getOrDefault(Long key, + Integer defaultValue) + { + if (get(key) != null) + { + return get(key); + } + return defaultValue; + } + + + /** + * Decreases the running total by the number of slots used in the + * interval leaving the moving window. + *

+ * The value in map is the number of free slots left in the interval. + * + * @see java.util.LinkedHashMap#removeEldestEntry(java.util.Map.Entry) + */ + protected boolean removeEldestEntry(Map.Entry eldest) + { + if (INTERVAL_WINDOW_SIZE < size()) + { + slotsUsedInWindow -= (INTERVAL_LIMIT - eldest.getValue()); + minimumInterval = eldest.getKey(); + return true; + } + return false; + } + + + /** + * Tries to allocate a slot in the given interval within the rate limit. + * + * @param interval + * the interval + * @return true is a slot was allocated in the interval + */ + public boolean allocateSlot(long interval) + { + boolean isSlotAllocated = false; + int freeSlots = 0; // Free slots in the interval + + if (interval > minimumInterval) + /* + * Cheap range check is OK + */ + { + synchronized (this) + /* + * Synchronize allocate on this object to ensure that cache is consistent + */ + { + if ((freeSlots = getOrDefault(interval, + INTERVAL_LIMIT)) > 0) + /* + * There are free slots in this interval to execute this thread + * Break out of the loop and return. + */ + { + if (currentInterval > 0 && currentInterval != interval) + /* + * Update the running total of slots used in window + * with past values only once past the first interval. + */ + { + slotsUsedInWindow += + INTERVAL_LIMIT + - getOrDefault(currentInterval, + 0); + } + + put(currentInterval = interval, freeSlots - 1); // Maximum is RATE_LIMIT - 1 + isSlotAllocated = true; + } // if + } // synchronized + } // if + + return isSlotAllocated; + } + + + /** + * Returns the moving average number of slots allocated for work during + * the present window but excluding the currently filling interval + * + * @return the average number of slots used + */ + public float getAverageSlotUsed() + { + return slotsUsedInWindow / (INTERVAL_WINDOW_SIZE - 1); + } + + + /** + * Check window size parameters for range. + * + * @param intervalWindowSize + * the proposed window size + * @return the window size + */ + private static int checkWindowSize(int intervalWindowSize) + { + if (intervalWindowSize < 2) + { + throw new IllegalArgumentException( + "Interval Window Size cannot be smaller than 2"); + } + return intervalWindowSize; + + } + + + /** + * Creates an {@code IntervalWindow} of a window size of two that limits the + * number of successful allocations in each interval. + * + * @param intervalLimit + * the maximum number of allocations per interval. + */ + public IntervalWindow(int intervalLimit) + { + super(2, 1); + INTERVAL_WINDOW_SIZE = 2; + INTERVAL_LIMIT = intervalLimit; + } + + + /** + * Creates an {@code IntervalWindow} of a given window size that limits the + * number of successful allocations in each interval. + * + * @param intervalLimit + * the maximum number of allocations per interval. + * @param intervalWindow + * the number if intervals to track, must be at least two + */ + public IntervalWindow(int intervalLimit, int intervalWindow) + { + super(checkWindowSize(intervalWindow), + 1); + INTERVAL_WINDOW_SIZE = intervalWindow; + INTERVAL_LIMIT = intervalLimit; + } + +} \ No newline at end of file diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/ThrottlingExecutorService.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/ThrottlingExecutorService.java new file mode 100644 index 000000000..b30e37511 --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/business/domain/job/ThrottlingExecutorService.java @@ -0,0 +1,166 @@ +package org.jeecg.modules.business.domain.job; + +import static java.lang.Integer.MAX_VALUE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A {@code ExecutorService} that throttles the amount of work released + * for execution per time period. + * + * @author martinb + * @since 2015-08-17 + */ +public class ThrottlingExecutorService extends ThreadPoolExecutor { + /** + * The interval window cache + */ + private final IntervalWindow INTERVAL_WINDOW; + + /** + * The rate limit interval in milliseconds + */ + private final long RATE_INTERVAL_MILLISECONDS; + + + /** + * Caching, dynamic rate limiting {@code ExecutorService} + * + * @param rateLimit + * the rate limit + * @param unit + * the rate limit time unit + */ + private ThrottlingExecutorService( + int rateLimit, + TimeUnit unit) + { + /* + * Create a CACHING ExecutorService + */ + super(0, MAX_VALUE, + 60L, SECONDS, + new SynchronousQueue()); + + INTERVAL_WINDOW = new IntervalWindow(rateLimit); + RATE_INTERVAL_MILLISECONDS = unit.toMillis(1); + } + + + /** + * Fixed size rate limiting {@code ExecutorService} + * + * @param parallelism + * @param rateLimit + * @param unit + */ + private ThrottlingExecutorService(int parallelism, + int rateLimit, + TimeUnit unit) + { + /* + * Create a FIXED ExecutorService + */ + super(parallelism, parallelism, 0, MILLISECONDS, + new LinkedBlockingQueue()); + + INTERVAL_WINDOW = new IntervalWindow(rateLimit); + RATE_INTERVAL_MILLISECONDS = unit.toMillis(1); + } + + + /** + * Produces a throttling ExecutorService + *

+ * Evaluates the parameters and generates an appropriate ExecutorService + * + * @param parallelism + * how many threads + * @param rateLimit + * work per time unit + * @param unit + * the time unit + * @return the ExecutorService + */ + public static ExecutorService createExecutorService(int parallelism, + int rateLimit, + TimeUnit unit) + { + if (parallelism > 0) + /* + * Fixed ExecutorService + */ + { + return new ThrottlingExecutorService(parallelism, + rateLimit > 0 ? rateLimit : MAX_VALUE, + unit); + } + else + /* + * Caching ExecutorService + */ + { + return new ThrottlingExecutorService( + rateLimit > 0 ? rateLimit : MAX_VALUE, + unit); + } + } + + + /** + * Throttles the execution before executing the task to achieve the desired + * rate. + * + * @see java.util.concurrent.ThreadPoolExecutor#execute(java.lang.Runnable) + */ + @Override + public void execute(final Runnable task) + { + throttle(); + super.execute(task); + } + + + /** + * Throttles if the thread can not be allocated in the current time + * interval, + * forcing it to wait to the next interval. + */ + private void throttle() + { + long interval = 0; // The interval index + long milliTime = System.currentTimeMillis(); // The current time + long offset = milliTime % RATE_INTERVAL_MILLISECONDS; // Interval offset + + while (!INTERVAL_WINDOW.allocateSlot( + (interval = (milliTime + offset) / RATE_INTERVAL_MILLISECONDS))) + /* + * Cannot allocate free slots in this interval. + * Calculate the required pause to get to the next interval and sleep + */ + { + int pause = (int) (((interval + 1) + * RATE_INTERVAL_MILLISECONDS) + - milliTime + offset); + try + /* + * Try to sleep the thread for a pause of nanoseconds + */ + { + Thread.sleep(pause); + } + catch (InterruptedException e) + { + } + + milliTime = System.currentTimeMillis(); + + } // while + } +}