diff --git a/src/main/java/run/halo/app/config/ExtensionConfiguration.java b/src/main/java/run/halo/app/config/ExtensionConfiguration.java index dc78c8547..967553725 100644 --- a/src/main/java/run/halo/app/config/ExtensionConfiguration.java +++ b/src/main/java/run/halo/app/config/ExtensionConfiguration.java @@ -94,7 +94,7 @@ public class ExtensionConfiguration { @Bean Controller userController(ExtensionClient client) { - return new ControllerBuilder("user-controller", client) + return new ControllerBuilder("user", client) .reconciler(new UserReconciler(client)) .extension(new User()) .build(); @@ -102,7 +102,7 @@ public class ExtensionConfiguration { @Bean Controller roleController(ExtensionClient client, RoleService roleService) { - return new ControllerBuilder("role-controller", client) + return new ControllerBuilder("role", client) .reconciler(new RoleReconciler(client, roleService)) .extension(new Role()) .build(); @@ -110,7 +110,7 @@ public class ExtensionConfiguration { @Bean Controller roleBindingController(ExtensionClient client) { - return new ControllerBuilder("role-binding-controller", client) + return new ControllerBuilder("role-binding", client) .reconciler(new RoleBindingReconciler(client)) .extension(new RoleBinding()) .build(); @@ -118,7 +118,7 @@ public class ExtensionConfiguration { @Bean Controller pluginController(ExtensionClient client, HaloPluginManager haloPluginManager) { - return new ControllerBuilder("plugin-controller", client) + return new ControllerBuilder("plugin", client) .reconciler(new PluginReconciler(client, haloPluginManager)) .extension(new Plugin()) .build(); @@ -126,7 +126,7 @@ public class ExtensionConfiguration { @Bean Controller menuController(ExtensionClient client) { - return new ControllerBuilder("menu-controller", client) + return new ControllerBuilder("menu", client) .reconciler(new MenuReconciler(client)) .extension(new Menu()) .build(); @@ -134,7 +134,7 @@ public class ExtensionConfiguration { @Bean Controller menuItemController(ExtensionClient client) { - return new ControllerBuilder("menu-item-controller", client) + return new ControllerBuilder("menu-item", client) .reconciler(new MenuItemReconciler(client)) .extension(new MenuItem()) .build(); @@ -142,7 +142,7 @@ public class ExtensionConfiguration { @Bean Controller themeController(ExtensionClient client, HaloProperties haloProperties) { - return new ControllerBuilder("theme-controller", client) + return new ControllerBuilder("theme", client) .reconciler(new ThemeReconciler(client, haloProperties)) .extension(new Theme()) .build(); @@ -152,18 +152,20 @@ public class ExtensionConfiguration { Controller postController(ExtensionClient client, ContentService contentService, PostPermalinkPolicy postPermalinkPolicy, CounterService counterService, PostService postService) { - return new ControllerBuilder("post-controller", client) + return new ControllerBuilder("post", client) .reconciler(new PostReconciler(client, contentService, postService, postPermalinkPolicy, counterService)) .extension(new Post()) + // TODO Make it configurable + .workerCount(10) .build(); } @Bean Controller categoryController(ExtensionClient client, CategoryPermalinkPolicy categoryPermalinkPolicy) { - return new ControllerBuilder("category-controller", client) + return new ControllerBuilder("category", client) .reconciler(new CategoryReconciler(client, categoryPermalinkPolicy)) .extension(new Category()) .build(); @@ -171,7 +173,7 @@ public class ExtensionConfiguration { @Bean Controller tagController(ExtensionClient client, TagPermalinkPolicy tagPermalinkPolicy) { - return new ControllerBuilder("tag-controller", client) + return new ControllerBuilder("tag", client) .reconciler(new TagReconciler(client, tagPermalinkPolicy)) .extension(new Tag()) .build(); @@ -181,7 +183,7 @@ public class ExtensionConfiguration { Controller systemSettingController(ExtensionClient client, SystemConfigurableEnvironmentFetcher environmentFetcher, ApplicationContext applicationContext) { - return new ControllerBuilder("system-setting-controller", client) + return new ControllerBuilder("system-setting", client) .reconciler(new SystemSettingReconciler(client, environmentFetcher, applicationContext)) .extension(new ConfigMap()) @@ -192,7 +194,7 @@ public class ExtensionConfiguration { Controller attachmentController(ExtensionClient client, ExtensionComponentsFinder extensionComponentsFinder, ExternalUrlSupplier externalUrl) { - return new ControllerBuilder("attachment-controller", client) + return new ControllerBuilder("attachment", client) .reconciler( new AttachmentReconciler(client, extensionComponentsFinder, externalUrl)) .extension(new Attachment()) @@ -203,7 +205,7 @@ public class ExtensionConfiguration { Controller singlePageController(ExtensionClient client, ContentService contentService, ApplicationContext applicationContext, CounterService counterService, SinglePageService singlePageService, ExternalUrlSupplier externalUrlSupplier) { - return new ControllerBuilder("single-page-controller", client) + return new ControllerBuilder("single-page", client) .reconciler(new SinglePageReconciler(client, contentService, applicationContext, singlePageService, counterService, externalUrlSupplier) ) @@ -214,7 +216,9 @@ public class ExtensionConfiguration { @Bean Controller commentController(ExtensionClient client, MeterRegistry meterRegistry, SchemeManager schemeManager) { - return new ControllerBuilder("comment-controller", client) + return new ControllerBuilder("comment", client) + // TODO Make it configurable + .workerCount(10) .reconciler(new CommentReconciler(client, meterRegistry, schemeManager)) .extension(new Comment()) .build(); @@ -223,7 +227,7 @@ public class ExtensionConfiguration { @Bean Controller reverseProxyController(ExtensionClient client, ReverseProxyRouterFunctionRegistry reverseProxyRouterFunctionRegistry) { - return new ControllerBuilder("reverse-proxy-controller", client) + return new ControllerBuilder("reverse-proxy", client) .reconciler(new ReverseProxyReconciler(client, reverseProxyRouterFunctionRegistry)) .extension(new ReverseProxy()) .build(); diff --git a/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java index e4de88979..4c30881ef 100644 --- a/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java +++ b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java @@ -35,6 +35,8 @@ public class ControllerBuilder { private boolean syncAllOnStart = true; + private int workerCount = 1; + public ControllerBuilder(String name, ExtensionClient client) { Assert.hasText(name, "Extension name is required"); Assert.notNull(client, "Extension client must not be null"); @@ -88,6 +90,11 @@ public class ControllerBuilder { return this; } + public ControllerBuilder workerCount(int workerCount) { + this.workerCount = workerCount; + return this; + } + public Controller build() { if (nowSupplier == null) { nowSupplier = Instant::now; @@ -116,6 +123,7 @@ public class ControllerBuilder { extension, watcher, predicates.onAddPredicate()); - return new DefaultController<>(name, reconciler, queue, synchronizer, minDelay, maxDelay); + return new DefaultController<>(name, reconciler, queue, synchronizer, minDelay, maxDelay, + workerCount); } } diff --git a/src/main/java/run/halo/app/extension/controller/DefaultController.java b/src/main/java/run/halo/app/extension/controller/DefaultController.java index 724806a2c..70b303da8 100644 --- a/src/main/java/run/halo/app/extension/controller/DefaultController.java +++ b/src/main/java/run/halo/app/extension/controller/DefaultController.java @@ -1,15 +1,17 @@ package run.halo.app.extension.controller; -import static java.util.concurrent.Executors.newSingleThreadExecutor; - import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.springframework.util.Assert; import org.springframework.util.StopWatch; import run.halo.app.extension.controller.RequestQueue.DelayedEntry; @@ -36,14 +38,9 @@ public class DefaultController implements Controller { private final Duration maxDelay; - public DefaultController(String name, - Reconciler reconciler, - RequestQueue queue, - Synchronizer synchronizer, - Duration minDelay, - Duration maxDelay) { - this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay); - } + private final int workerCount; + + private final AtomicLong workerCounter; public DefaultController(String name, Reconciler reconciler, @@ -52,7 +49,8 @@ public class DefaultController implements Controller { Supplier nowSupplier, Duration minDelay, Duration maxDelay, - ExecutorService executor) { + ExecutorService executor, int workerCount) { + Assert.isTrue(workerCount > 0, "Worker count must not be less than 1"); this.name = name; this.reconciler = reconciler; this.nowSupplier = nowSupplier; @@ -61,6 +59,26 @@ public class DefaultController implements Controller { this.minDelay = minDelay; this.maxDelay = maxDelay; this.executor = executor; + this.workerCount = workerCount; + this.workerCounter = new AtomicLong(); + } + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + Synchronizer synchronizer, + Duration minDelay, + Duration maxDelay) { + this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay, 1); + } + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + Synchronizer synchronizer, + Duration minDelay, + Duration maxDelay, int workerCount) { + this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay, workerCount); } public DefaultController(String name, @@ -69,14 +87,14 @@ public class DefaultController implements Controller { Synchronizer synchronizer, Supplier nowSupplier, Duration minDelay, - Duration maxDelay) { + Duration maxDelay, int workerCount) { this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, - newSingleThreadExecutor(threadFactory())); + Executors.newFixedThreadPool(workerCount, threadFactory(name)), workerCount); } - private static ThreadFactory threadFactory() { + private static ThreadFactory threadFactory(String name) { return new BasicThreadFactory.Builder() - .namingPattern("reconciler-thread-%d") + .namingPattern(name + "-t-%d") .daemon(false) .uncaughtExceptionHandler((t, e) -> log.error("Controller " + t.getName() + " encountered an error unexpectedly", e)) @@ -88,6 +106,10 @@ public class DefaultController implements Controller { return name; } + public int getWorkerCount() { + return workerCount; + } + @Override public void start() { if (isStarted() || isDisposed()) { @@ -96,68 +118,94 @@ public class DefaultController implements Controller { } this.started = true; log.info("Starting controller {}", name); - // TODO Make more workers run the reconciler. - executor.submit(this::run); + IntStream.range(0, getWorkerCount()) + .mapToObj(i -> new Worker()) + .forEach(executor::submit); } - protected void run() { - log.info("Controller {} started", name); - synchronizer.start(); - while (!this.isDisposed() && !Thread.currentThread().isInterrupted()) { - try { - var entry = queue.take(); - Reconciler.Result result; + /** + * Worker for controller. + * + * @author johnniang + */ + class Worker implements Runnable { + + private final String name; + + Worker() { + this.name = + DefaultController.this.getName() + "-worker-" + workerCounter.incrementAndGet(); + } + + public String getName() { + return name; + } + + @Override + public void run() { + log.info("Controller worker {} started", this.name); + synchronizer.start(); + while (!isDisposed() && !Thread.currentThread().isInterrupted()) { try { - log.debug("Reconciling request {} at {}", entry.getEntry(), nowSupplier.get()); - StopWatch watch = new StopWatch("Reconcile: " + entry.getEntry()); - watch.start("reconciliation"); - result = this.reconciler.reconcile(entry.getEntry()); - watch.stop(); - log.debug("Reconciled request: {} with result: {}", entry.getEntry(), result); - if (log.isDebugEnabled()) { - log.debug(watch.toString()); + var entry = queue.take(); + Reconciler.Result result; + try { + log.debug("{} >>> Reconciling request {} at {}", this.name, + entry.getEntry(), + nowSupplier.get()); + var watch = new StopWatch(this.name + ":reconcile: " + entry.getEntry()); + watch.start("reconciliation"); + result = reconciler.reconcile(entry.getEntry()); + watch.stop(); + log.debug("{} >>> Reconciled request: {} with result: {}", this.name, + entry.getEntry(), result); + if (log.isTraceEnabled()) { + log.trace(watch.toString()); + } + } catch (Throwable t) { + log.error("Reconciler in " + this.name + + " aborted with an error, re-enqueuing...", + t); + result = new Reconciler.Result(true, null); + } finally { + queue.done(entry.getEntry()); } - } catch (Throwable t) { - log.error("Reconciler aborted with an error, re-enqueuing...", t); - result = new Reconciler.Result(true, null); - } finally { - queue.done(entry.getEntry()); - } - if (result == null) { - result = new Reconciler.Result(false, null); - } - if (!result.reEnqueue()) { - continue; - } - var retryAfter = result.retryAfter(); - if (retryAfter == null) { - retryAfter = entry.getRetryAfter(); - if (retryAfter == null - || retryAfter.isNegative() - || retryAfter.isZero() - || retryAfter.compareTo(minDelay) < 0) { - // set min retry after - retryAfter = minDelay; - } else { - try { - // TODO Refactor the retryAfter with ratelimiter - retryAfter = retryAfter.multipliedBy(2); - } catch (ArithmeticException e) { + if (result == null) { + result = new Reconciler.Result(false, null); + } + if (!result.reEnqueue()) { + continue; + } + var retryAfter = result.retryAfter(); + if (retryAfter == null) { + retryAfter = entry.getRetryAfter(); + if (retryAfter == null + || retryAfter.isNegative() + || retryAfter.isZero() + || retryAfter.compareTo(minDelay) < 0) { + // set min retry after + retryAfter = minDelay; + } else { + try { + // TODO Refactor the retryAfter with ratelimiter + retryAfter = retryAfter.multipliedBy(2); + } catch (ArithmeticException e) { + retryAfter = maxDelay; + } + } + if (retryAfter.compareTo(maxDelay) > 0) { retryAfter = maxDelay; } } - if (retryAfter.compareTo(maxDelay) > 0) { - retryAfter = maxDelay; - } + queue.add( + new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Controller worker {} interrupted", name); } - queue.add( - new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.info("Controller {} interrupted", name); } + log.info("Controller worker {} is stopped", name); } - log.info("Controller {} is stopped", name); } @Override diff --git a/src/main/java/run/halo/app/extension/gc/GarbageCollectorConfiguration.java b/src/main/java/run/halo/app/extension/gc/GarbageCollectorConfiguration.java index 219abc16b..af46b7db6 100644 --- a/src/main/java/run/halo/app/extension/gc/GarbageCollectorConfiguration.java +++ b/src/main/java/run/halo/app/extension/gc/GarbageCollectorConfiguration.java @@ -15,7 +15,6 @@ import run.halo.app.extension.store.ExtensionStoreClient; @Configuration(proxyBeanMethods = false) public class GarbageCollectorConfiguration { - @Bean Controller garbageCollector(ExtensionClient client, ExtensionStoreClient storeClient, @@ -30,7 +29,8 @@ public class GarbageCollectorConfiguration { queue, synchronizer, Duration.ofMillis(500), - Duration.ofSeconds(1000) - ); + Duration.ofSeconds(1000), + // TODO Make it configurable + 10); } } diff --git a/src/main/resources/application-dev.yaml b/src/main/resources/application-dev.yaml index 045844213..1c0210284 100644 --- a/src/main/resources/application-dev.yaml +++ b/src/main/resources/application-dev.yaml @@ -25,7 +25,6 @@ halo: logging: level: run.halo.app: DEBUG - org.springframework.r2dbc: DEBUG springdoc: api-docs: enabled: true diff --git a/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java b/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java index b38f9962f..6d599241b 100644 --- a/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java +++ b/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java @@ -2,6 +2,7 @@ package run.halo.app.extension.controller; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -18,6 +19,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -51,114 +53,133 @@ class DefaultControllerTest { @BeforeEach void setUp() { - controller = new DefaultController<>("fake-controller", reconciler, queue, synchronizer, - () -> now, minRetryAfter, maxRetryAfter, executor); + controller = createController(1); assertFalse(controller.isDisposed()); assertFalse(controller.isStarted()); } + DefaultController createController(int workerCount) { + return new DefaultController<>("fake-controller", reconciler, queue, synchronizer, + () -> now, minRetryAfter, maxRetryAfter, executor, workerCount); + } + @Test void shouldReturnRightName() { assertEquals("fake-controller", controller.getName()); } - @Test - void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException { - when(queue.take()).thenReturn(new DelayedEntry<>( - new Request("fake-request"), Duration.ofSeconds(1), () -> now - )) - .thenThrow(InterruptedException.class); - when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null)); + @Nested + class WorkerTest { - controller.run(); + @Test + void shouldCreateCorrectName() { + var worker = controller.new Worker(); + assertEquals("fake-controller-worker-1", worker.getName()); + worker = controller.new Worker(); + assertEquals("fake-controller-worker-2", worker.getName()); + worker = controller.new Worker(); + assertEquals("fake-controller-worker-3", worker.getName()); + } - verify(synchronizer, times(1)).start(); - verify(queue, times(2)).take(); - verify(queue, times(0)).add(any()); - verify(queue, times(1)).done(any()); - verify(reconciler, times(1)).reconcile(eq(new Request("fake-request"))); - } + @Test + void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null)); - @Test - void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException { - when(queue.take()).thenReturn(new DelayedEntry<>( - new Request("fake-request"), Duration.ofSeconds(1), () -> now - )) - .thenThrow(InterruptedException.class); - when(queue.add(any())).thenReturn(true); - when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + controller.new Worker().run(); - controller.run(); + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(0)).add(any()); + verify(queue, times(1)).done(any()); + verify(reconciler, times(1)).reconcile(eq(new Request("fake-request"))); + } - verify(synchronizer, times(1)).start(); - verify(queue, times(2)).take(); - verify(queue, times(1)).done(any()); - verify(queue, times(1)).add(argThat(de -> - de.getEntry().name().equals("fake-request") - && de.getRetryAfter().equals(Duration.ofSeconds(2)))); - verify(reconciler, times(1)).reconcile(any(Request.class)); - } + @Test + void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); - @Test - void shouldReRunIfReconcilerThrowException() throws InterruptedException { - when(queue.take()).thenReturn(new DelayedEntry<>( - new Request("fake-request"), Duration.ofSeconds(1), () -> now - )) - .thenThrow(InterruptedException.class); - when(queue.add(any())).thenReturn(true); - when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class); + controller.new Worker().run(); - controller.run(); + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(Duration.ofSeconds(2)))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } - verify(synchronizer, times(1)).start(); - verify(queue, times(2)).take(); - verify(queue, times(1)).done(any()); - verify(queue, times(1)).add(argThat(de -> - de.getEntry().name().equals("fake-request") - && de.getRetryAfter().equals(Duration.ofSeconds(2)))); - verify(reconciler, times(1)).reconcile(any(Request.class)); - } + @Test + void shouldReRunIfReconcilerThrowException() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class); - @Test - void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException { - when(queue.take()).thenReturn(new DelayedEntry<>( - new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now - )) - .thenThrow(InterruptedException.class); - when(queue.add(any())).thenReturn(true); - when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + controller.new Worker().run(); - controller.run(); + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(Duration.ofSeconds(2)))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } - verify(synchronizer, times(1)).start(); - verify(queue, times(2)).take(); - verify(queue, times(1)).done(any()); - verify(queue, times(1)).add(argThat(de -> - de.getEntry().name().equals("fake-request") - && de.getRetryAfter().equals(minRetryAfter))); - verify(reconciler, times(1)).reconcile(any(Request.class)); - } + @Test + void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); - @Test - void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry() - throws InterruptedException { - when(queue.take()).thenReturn(new DelayedEntry<>( - new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now - )) - .thenThrow(InterruptedException.class); - when(queue.add(any())).thenReturn(true); - when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + controller.new Worker().run(); - controller.run(); + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(minRetryAfter))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } + + @Test + void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry() + throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + + controller.new Worker().run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(maxRetryAfter))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } - verify(synchronizer, times(1)).start(); - verify(queue, times(2)).take(); - verify(queue, times(1)).done(any()); - verify(queue, times(1)).add(argThat(de -> - de.getEntry().name().equals("fake-request") - && de.getRetryAfter().equals(maxRetryAfter))); - verify(reconciler, times(1)).reconcile(any(Request.class)); } @Test @@ -221,4 +242,17 @@ class DefaultControllerTest { verify(executor, times(0)).submit(any(Runnable.class)); } + + @Test + void shouldCreateMultiWorkers() { + controller = createController(5); + controller.start(); + verify(executor, times(5)).submit(any(DefaultController.Worker.class)); + } + + @Test + void shouldFailToCreateControllerDueToInvalidWorkerCount() { + assertThrows(IllegalArgumentException.class, () -> createController(0)); + assertThrows(IllegalArgumentException.class, () -> createController(-1)); + } } \ No newline at end of file