Enable adding more workers for controller (#2718)

#### What type of PR is this?

/kind feature
/kind improvement
/area core
/milestone 2.0

#### What this PR does / why we need it:

This PR enables adding more workers for controller to speed up reconciliations. Default woker count is 1 for one controller.

**What's next?**

- [ ] Enable configuring worker count for every controller in configuration properties.

#### Which issue(s) this PR fixes:

Fixes https://github.com/halo-dev/halo/issues/2708

#### Special notes for reviewers

You can see there are more threads for post controller in the following screenshot:

![image](https://user-images.githubusercontent.com/16865714/202608470-8763826a-a69c-47b5-8f41-f0612919d681.png)

#### Does this PR introduce a user-facing change?

```release-note
None
```
pull/2720/head^2
John Niang 2022-11-18 16:08:22 +08:00 committed by GitHub
parent c8bc96ffc3
commit e87067eb60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 264 additions and 171 deletions

View File

@ -94,7 +94,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller userController(ExtensionClient client) { Controller userController(ExtensionClient client) {
return new ControllerBuilder("user-controller", client) return new ControllerBuilder("user", client)
.reconciler(new UserReconciler(client)) .reconciler(new UserReconciler(client))
.extension(new User()) .extension(new User())
.build(); .build();
@ -102,7 +102,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller roleController(ExtensionClient client, RoleService roleService) { Controller roleController(ExtensionClient client, RoleService roleService) {
return new ControllerBuilder("role-controller", client) return new ControllerBuilder("role", client)
.reconciler(new RoleReconciler(client, roleService)) .reconciler(new RoleReconciler(client, roleService))
.extension(new Role()) .extension(new Role())
.build(); .build();
@ -110,7 +110,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller roleBindingController(ExtensionClient client) { Controller roleBindingController(ExtensionClient client) {
return new ControllerBuilder("role-binding-controller", client) return new ControllerBuilder("role-binding", client)
.reconciler(new RoleBindingReconciler(client)) .reconciler(new RoleBindingReconciler(client))
.extension(new RoleBinding()) .extension(new RoleBinding())
.build(); .build();
@ -118,7 +118,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller pluginController(ExtensionClient client, HaloPluginManager haloPluginManager) { Controller pluginController(ExtensionClient client, HaloPluginManager haloPluginManager) {
return new ControllerBuilder("plugin-controller", client) return new ControllerBuilder("plugin", client)
.reconciler(new PluginReconciler(client, haloPluginManager)) .reconciler(new PluginReconciler(client, haloPluginManager))
.extension(new Plugin()) .extension(new Plugin())
.build(); .build();
@ -126,7 +126,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller menuController(ExtensionClient client) { Controller menuController(ExtensionClient client) {
return new ControllerBuilder("menu-controller", client) return new ControllerBuilder("menu", client)
.reconciler(new MenuReconciler(client)) .reconciler(new MenuReconciler(client))
.extension(new Menu()) .extension(new Menu())
.build(); .build();
@ -134,7 +134,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller menuItemController(ExtensionClient client) { Controller menuItemController(ExtensionClient client) {
return new ControllerBuilder("menu-item-controller", client) return new ControllerBuilder("menu-item", client)
.reconciler(new MenuItemReconciler(client)) .reconciler(new MenuItemReconciler(client))
.extension(new MenuItem()) .extension(new MenuItem())
.build(); .build();
@ -142,7 +142,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller themeController(ExtensionClient client, HaloProperties haloProperties) { Controller themeController(ExtensionClient client, HaloProperties haloProperties) {
return new ControllerBuilder("theme-controller", client) return new ControllerBuilder("theme", client)
.reconciler(new ThemeReconciler(client, haloProperties)) .reconciler(new ThemeReconciler(client, haloProperties))
.extension(new Theme()) .extension(new Theme())
.build(); .build();
@ -152,18 +152,20 @@ public class ExtensionConfiguration {
Controller postController(ExtensionClient client, ContentService contentService, Controller postController(ExtensionClient client, ContentService contentService,
PostPermalinkPolicy postPermalinkPolicy, CounterService counterService, PostPermalinkPolicy postPermalinkPolicy, CounterService counterService,
PostService postService) { PostService postService) {
return new ControllerBuilder("post-controller", client) return new ControllerBuilder("post", client)
.reconciler(new PostReconciler(client, contentService, postService, .reconciler(new PostReconciler(client, contentService, postService,
postPermalinkPolicy, postPermalinkPolicy,
counterService)) counterService))
.extension(new Post()) .extension(new Post())
// TODO Make it configurable
.workerCount(10)
.build(); .build();
} }
@Bean @Bean
Controller categoryController(ExtensionClient client, Controller categoryController(ExtensionClient client,
CategoryPermalinkPolicy categoryPermalinkPolicy) { CategoryPermalinkPolicy categoryPermalinkPolicy) {
return new ControllerBuilder("category-controller", client) return new ControllerBuilder("category", client)
.reconciler(new CategoryReconciler(client, categoryPermalinkPolicy)) .reconciler(new CategoryReconciler(client, categoryPermalinkPolicy))
.extension(new Category()) .extension(new Category())
.build(); .build();
@ -171,7 +173,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller tagController(ExtensionClient client, TagPermalinkPolicy tagPermalinkPolicy) { Controller tagController(ExtensionClient client, TagPermalinkPolicy tagPermalinkPolicy) {
return new ControllerBuilder("tag-controller", client) return new ControllerBuilder("tag", client)
.reconciler(new TagReconciler(client, tagPermalinkPolicy)) .reconciler(new TagReconciler(client, tagPermalinkPolicy))
.extension(new Tag()) .extension(new Tag())
.build(); .build();
@ -181,7 +183,7 @@ public class ExtensionConfiguration {
Controller systemSettingController(ExtensionClient client, Controller systemSettingController(ExtensionClient client,
SystemConfigurableEnvironmentFetcher environmentFetcher, SystemConfigurableEnvironmentFetcher environmentFetcher,
ApplicationContext applicationContext) { ApplicationContext applicationContext) {
return new ControllerBuilder("system-setting-controller", client) return new ControllerBuilder("system-setting", client)
.reconciler(new SystemSettingReconciler(client, environmentFetcher, .reconciler(new SystemSettingReconciler(client, environmentFetcher,
applicationContext)) applicationContext))
.extension(new ConfigMap()) .extension(new ConfigMap())
@ -192,7 +194,7 @@ public class ExtensionConfiguration {
Controller attachmentController(ExtensionClient client, Controller attachmentController(ExtensionClient client,
ExtensionComponentsFinder extensionComponentsFinder, ExtensionComponentsFinder extensionComponentsFinder,
ExternalUrlSupplier externalUrl) { ExternalUrlSupplier externalUrl) {
return new ControllerBuilder("attachment-controller", client) return new ControllerBuilder("attachment", client)
.reconciler( .reconciler(
new AttachmentReconciler(client, extensionComponentsFinder, externalUrl)) new AttachmentReconciler(client, extensionComponentsFinder, externalUrl))
.extension(new Attachment()) .extension(new Attachment())
@ -203,7 +205,7 @@ public class ExtensionConfiguration {
Controller singlePageController(ExtensionClient client, ContentService contentService, Controller singlePageController(ExtensionClient client, ContentService contentService,
ApplicationContext applicationContext, CounterService counterService, ApplicationContext applicationContext, CounterService counterService,
SinglePageService singlePageService, ExternalUrlSupplier externalUrlSupplier) { SinglePageService singlePageService, ExternalUrlSupplier externalUrlSupplier) {
return new ControllerBuilder("single-page-controller", client) return new ControllerBuilder("single-page", client)
.reconciler(new SinglePageReconciler(client, contentService, .reconciler(new SinglePageReconciler(client, contentService,
applicationContext, singlePageService, counterService, externalUrlSupplier) applicationContext, singlePageService, counterService, externalUrlSupplier)
) )
@ -214,7 +216,9 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller commentController(ExtensionClient client, MeterRegistry meterRegistry, Controller commentController(ExtensionClient client, MeterRegistry meterRegistry,
SchemeManager schemeManager) { SchemeManager schemeManager) {
return new ControllerBuilder("comment-controller", client) return new ControllerBuilder("comment", client)
// TODO Make it configurable
.workerCount(10)
.reconciler(new CommentReconciler(client, meterRegistry, schemeManager)) .reconciler(new CommentReconciler(client, meterRegistry, schemeManager))
.extension(new Comment()) .extension(new Comment())
.build(); .build();
@ -223,7 +227,7 @@ public class ExtensionConfiguration {
@Bean @Bean
Controller reverseProxyController(ExtensionClient client, Controller reverseProxyController(ExtensionClient client,
ReverseProxyRouterFunctionRegistry reverseProxyRouterFunctionRegistry) { ReverseProxyRouterFunctionRegistry reverseProxyRouterFunctionRegistry) {
return new ControllerBuilder("reverse-proxy-controller", client) return new ControllerBuilder("reverse-proxy", client)
.reconciler(new ReverseProxyReconciler(client, reverseProxyRouterFunctionRegistry)) .reconciler(new ReverseProxyReconciler(client, reverseProxyRouterFunctionRegistry))
.extension(new ReverseProxy()) .extension(new ReverseProxy())
.build(); .build();

View File

@ -35,6 +35,8 @@ public class ControllerBuilder {
private boolean syncAllOnStart = true; private boolean syncAllOnStart = true;
private int workerCount = 1;
public ControllerBuilder(String name, ExtensionClient client) { public ControllerBuilder(String name, ExtensionClient client) {
Assert.hasText(name, "Extension name is required"); Assert.hasText(name, "Extension name is required");
Assert.notNull(client, "Extension client must not be null"); Assert.notNull(client, "Extension client must not be null");
@ -88,6 +90,11 @@ public class ControllerBuilder {
return this; return this;
} }
public ControllerBuilder workerCount(int workerCount) {
this.workerCount = workerCount;
return this;
}
public Controller build() { public Controller build() {
if (nowSupplier == null) { if (nowSupplier == null) {
nowSupplier = Instant::now; nowSupplier = Instant::now;
@ -116,6 +123,7 @@ public class ControllerBuilder {
extension, extension,
watcher, watcher,
predicates.onAddPredicate()); predicates.onAddPredicate());
return new DefaultController<>(name, reconciler, queue, synchronizer, minDelay, maxDelay); return new DefaultController<>(name, reconciler, queue, synchronizer, minDelay, maxDelay,
workerCount);
} }
} }

View File

@ -1,15 +1,17 @@
package run.halo.app.extension.controller; package run.halo.app.extension.controller;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
import run.halo.app.extension.controller.RequestQueue.DelayedEntry; import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
@ -36,14 +38,9 @@ public class DefaultController<R> implements Controller {
private final Duration maxDelay; private final Duration maxDelay;
public DefaultController(String name, private final int workerCount;
Reconciler<R> reconciler,
RequestQueue<R> queue, private final AtomicLong workerCounter;
Synchronizer<R> synchronizer,
Duration minDelay,
Duration maxDelay) {
this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay);
}
public DefaultController(String name, public DefaultController(String name,
Reconciler<R> reconciler, Reconciler<R> reconciler,
@ -52,7 +49,8 @@ public class DefaultController<R> implements Controller {
Supplier<Instant> nowSupplier, Supplier<Instant> nowSupplier,
Duration minDelay, Duration minDelay,
Duration maxDelay, Duration maxDelay,
ExecutorService executor) { ExecutorService executor, int workerCount) {
Assert.isTrue(workerCount > 0, "Worker count must not be less than 1");
this.name = name; this.name = name;
this.reconciler = reconciler; this.reconciler = reconciler;
this.nowSupplier = nowSupplier; this.nowSupplier = nowSupplier;
@ -61,6 +59,26 @@ public class DefaultController<R> implements Controller {
this.minDelay = minDelay; this.minDelay = minDelay;
this.maxDelay = maxDelay; this.maxDelay = maxDelay;
this.executor = executor; this.executor = executor;
this.workerCount = workerCount;
this.workerCounter = new AtomicLong();
}
public DefaultController(String name,
Reconciler<R> reconciler,
RequestQueue<R> queue,
Synchronizer<R> synchronizer,
Duration minDelay,
Duration maxDelay) {
this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay, 1);
}
public DefaultController(String name,
Reconciler<R> reconciler,
RequestQueue<R> queue,
Synchronizer<R> synchronizer,
Duration minDelay,
Duration maxDelay, int workerCount) {
this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay, workerCount);
} }
public DefaultController(String name, public DefaultController(String name,
@ -69,14 +87,14 @@ public class DefaultController<R> implements Controller {
Synchronizer<R> synchronizer, Synchronizer<R> synchronizer,
Supplier<Instant> nowSupplier, Supplier<Instant> nowSupplier,
Duration minDelay, Duration minDelay,
Duration maxDelay) { Duration maxDelay, int workerCount) {
this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay,
newSingleThreadExecutor(threadFactory())); Executors.newFixedThreadPool(workerCount, threadFactory(name)), workerCount);
} }
private static ThreadFactory threadFactory() { private static ThreadFactory threadFactory(String name) {
return new BasicThreadFactory.Builder() return new BasicThreadFactory.Builder()
.namingPattern("reconciler-thread-%d") .namingPattern(name + "-t-%d")
.daemon(false) .daemon(false)
.uncaughtExceptionHandler((t, e) -> .uncaughtExceptionHandler((t, e) ->
log.error("Controller " + t.getName() + " encountered an error unexpectedly", e)) log.error("Controller " + t.getName() + " encountered an error unexpectedly", e))
@ -88,6 +106,10 @@ public class DefaultController<R> implements Controller {
return name; return name;
} }
public int getWorkerCount() {
return workerCount;
}
@Override @Override
public void start() { public void start() {
if (isStarted() || isDisposed()) { if (isStarted() || isDisposed()) {
@ -96,68 +118,94 @@ public class DefaultController<R> implements Controller {
} }
this.started = true; this.started = true;
log.info("Starting controller {}", name); log.info("Starting controller {}", name);
// TODO Make more workers run the reconciler. IntStream.range(0, getWorkerCount())
executor.submit(this::run); .mapToObj(i -> new Worker())
.forEach(executor::submit);
} }
protected void run() { /**
log.info("Controller {} started", name); * Worker for controller.
synchronizer.start(); *
while (!this.isDisposed() && !Thread.currentThread().isInterrupted()) { * @author johnniang
try { */
var entry = queue.take(); class Worker implements Runnable {
Reconciler.Result result;
private final String name;
Worker() {
this.name =
DefaultController.this.getName() + "-worker-" + workerCounter.incrementAndGet();
}
public String getName() {
return name;
}
@Override
public void run() {
log.info("Controller worker {} started", this.name);
synchronizer.start();
while (!isDisposed() && !Thread.currentThread().isInterrupted()) {
try { try {
log.debug("Reconciling request {} at {}", entry.getEntry(), nowSupplier.get()); var entry = queue.take();
StopWatch watch = new StopWatch("Reconcile: " + entry.getEntry()); Reconciler.Result result;
watch.start("reconciliation"); try {
result = this.reconciler.reconcile(entry.getEntry()); log.debug("{} >>> Reconciling request {} at {}", this.name,
watch.stop(); entry.getEntry(),
log.debug("Reconciled request: {} with result: {}", entry.getEntry(), result); nowSupplier.get());
if (log.isDebugEnabled()) { var watch = new StopWatch(this.name + ":reconcile: " + entry.getEntry());
log.debug(watch.toString()); watch.start("reconciliation");
result = reconciler.reconcile(entry.getEntry());
watch.stop();
log.debug("{} >>> Reconciled request: {} with result: {}", this.name,
entry.getEntry(), result);
if (log.isTraceEnabled()) {
log.trace(watch.toString());
}
} catch (Throwable t) {
log.error("Reconciler in " + this.name
+ " aborted with an error, re-enqueuing...",
t);
result = new Reconciler.Result(true, null);
} finally {
queue.done(entry.getEntry());
} }
} catch (Throwable t) { if (result == null) {
log.error("Reconciler aborted with an error, re-enqueuing...", t); result = new Reconciler.Result(false, null);
result = new Reconciler.Result(true, null); }
} finally { if (!result.reEnqueue()) {
queue.done(entry.getEntry()); continue;
} }
if (result == null) { var retryAfter = result.retryAfter();
result = new Reconciler.Result(false, null); if (retryAfter == null) {
} retryAfter = entry.getRetryAfter();
if (!result.reEnqueue()) { if (retryAfter == null
continue; || retryAfter.isNegative()
} || retryAfter.isZero()
var retryAfter = result.retryAfter(); || retryAfter.compareTo(minDelay) < 0) {
if (retryAfter == null) { // set min retry after
retryAfter = entry.getRetryAfter(); retryAfter = minDelay;
if (retryAfter == null } else {
|| retryAfter.isNegative() try {
|| retryAfter.isZero() // TODO Refactor the retryAfter with ratelimiter
|| retryAfter.compareTo(minDelay) < 0) { retryAfter = retryAfter.multipliedBy(2);
// set min retry after } catch (ArithmeticException e) {
retryAfter = minDelay; retryAfter = maxDelay;
} else { }
try { }
// TODO Refactor the retryAfter with ratelimiter if (retryAfter.compareTo(maxDelay) > 0) {
retryAfter = retryAfter.multipliedBy(2);
} catch (ArithmeticException e) {
retryAfter = maxDelay; retryAfter = maxDelay;
} }
} }
if (retryAfter.compareTo(maxDelay) > 0) { queue.add(
retryAfter = maxDelay; new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier));
} } catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Controller worker {} interrupted", name);
} }
queue.add(
new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Controller {} interrupted", name);
} }
log.info("Controller worker {} is stopped", name);
} }
log.info("Controller {} is stopped", name);
} }
@Override @Override

View File

@ -15,7 +15,6 @@ import run.halo.app.extension.store.ExtensionStoreClient;
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
public class GarbageCollectorConfiguration { public class GarbageCollectorConfiguration {
@Bean @Bean
Controller garbageCollector(ExtensionClient client, Controller garbageCollector(ExtensionClient client,
ExtensionStoreClient storeClient, ExtensionStoreClient storeClient,
@ -30,7 +29,8 @@ public class GarbageCollectorConfiguration {
queue, queue,
synchronizer, synchronizer,
Duration.ofMillis(500), Duration.ofMillis(500),
Duration.ofSeconds(1000) Duration.ofSeconds(1000),
); // TODO Make it configurable
10);
} }
} }

View File

@ -25,7 +25,6 @@ halo:
logging: logging:
level: level:
run.halo.app: DEBUG run.halo.app: DEBUG
org.springframework.r2dbc: DEBUG
springdoc: springdoc:
api-docs: api-docs:
enabled: true enabled: true

View File

@ -2,6 +2,7 @@ package run.halo.app.extension.controller;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
@ -18,6 +19,7 @@ import java.time.Instant;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -51,114 +53,133 @@ class DefaultControllerTest {
@BeforeEach @BeforeEach
void setUp() { void setUp() {
controller = new DefaultController<>("fake-controller", reconciler, queue, synchronizer, controller = createController(1);
() -> now, minRetryAfter, maxRetryAfter, executor);
assertFalse(controller.isDisposed()); assertFalse(controller.isDisposed());
assertFalse(controller.isStarted()); assertFalse(controller.isStarted());
} }
DefaultController<Request> createController(int workerCount) {
return new DefaultController<>("fake-controller", reconciler, queue, synchronizer,
() -> now, minRetryAfter, maxRetryAfter, executor, workerCount);
}
@Test @Test
void shouldReturnRightName() { void shouldReturnRightName() {
assertEquals("fake-controller", controller.getName()); assertEquals("fake-controller", controller.getName());
} }
@Test @Nested
void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException { class WorkerTest {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), Duration.ofSeconds(1), () -> now
))
.thenThrow(InterruptedException.class);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null));
controller.run(); @Test
void shouldCreateCorrectName() {
var worker = controller.new Worker();
assertEquals("fake-controller-worker-1", worker.getName());
worker = controller.new Worker();
assertEquals("fake-controller-worker-2", worker.getName());
worker = controller.new Worker();
assertEquals("fake-controller-worker-3", worker.getName());
}
verify(synchronizer, times(1)).start(); @Test
verify(queue, times(2)).take(); void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException {
verify(queue, times(0)).add(any()); when(queue.take()).thenReturn(new DelayedEntry<>(
verify(queue, times(1)).done(any()); new Request("fake-request"), Duration.ofSeconds(1), () -> now
verify(reconciler, times(1)).reconcile(eq(new Request("fake-request"))); ))
} .thenThrow(InterruptedException.class);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null));
@Test controller.new Worker().run();
void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), Duration.ofSeconds(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
controller.run(); verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(0)).add(any());
verify(queue, times(1)).done(any());
verify(reconciler, times(1)).reconcile(eq(new Request("fake-request")));
}
verify(synchronizer, times(1)).start(); @Test
verify(queue, times(2)).take(); void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException {
verify(queue, times(1)).done(any()); when(queue.take()).thenReturn(new DelayedEntry<>(
verify(queue, times(1)).add(argThat(de -> new Request("fake-request"), Duration.ofSeconds(1), () -> now
de.getEntry().name().equals("fake-request") ))
&& de.getRetryAfter().equals(Duration.ofSeconds(2)))); .thenThrow(InterruptedException.class);
verify(reconciler, times(1)).reconcile(any(Request.class)); when(queue.add(any())).thenReturn(true);
} when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
@Test controller.new Worker().run();
void shouldReRunIfReconcilerThrowException() throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), Duration.ofSeconds(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class);
controller.run(); verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(Duration.ofSeconds(2))));
verify(reconciler, times(1)).reconcile(any(Request.class));
}
verify(synchronizer, times(1)).start(); @Test
verify(queue, times(2)).take(); void shouldReRunIfReconcilerThrowException() throws InterruptedException {
verify(queue, times(1)).done(any()); when(queue.take()).thenReturn(new DelayedEntry<>(
verify(queue, times(1)).add(argThat(de -> new Request("fake-request"), Duration.ofSeconds(1), () -> now
de.getEntry().name().equals("fake-request") ))
&& de.getRetryAfter().equals(Duration.ofSeconds(2)))); .thenThrow(InterruptedException.class);
verify(reconciler, times(1)).reconcile(any(Request.class)); when(queue.add(any())).thenReturn(true);
} when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class);
@Test controller.new Worker().run();
void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
controller.run(); verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(Duration.ofSeconds(2))));
verify(reconciler, times(1)).reconcile(any(Request.class));
}
verify(synchronizer, times(1)).start(); @Test
verify(queue, times(2)).take(); void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException {
verify(queue, times(1)).done(any()); when(queue.take()).thenReturn(new DelayedEntry<>(
verify(queue, times(1)).add(argThat(de -> new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now
de.getEntry().name().equals("fake-request") ))
&& de.getRetryAfter().equals(minRetryAfter))); .thenThrow(InterruptedException.class);
verify(reconciler, times(1)).reconcile(any(Request.class)); when(queue.add(any())).thenReturn(true);
} when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
@Test controller.new Worker().run();
void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry()
throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
controller.run(); verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(minRetryAfter)));
verify(reconciler, times(1)).reconcile(any(Request.class));
}
@Test
void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry()
throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(maxRetryAfter)));
verify(reconciler, times(1)).reconcile(any(Request.class));
}
verify(synchronizer, times(1)).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(maxRetryAfter)));
verify(reconciler, times(1)).reconcile(any(Request.class));
} }
@Test @Test
@ -221,4 +242,17 @@ class DefaultControllerTest {
verify(executor, times(0)).submit(any(Runnable.class)); verify(executor, times(0)).submit(any(Runnable.class));
} }
@Test
void shouldCreateMultiWorkers() {
controller = createController(5);
controller.start();
verify(executor, times(5)).submit(any(DefaultController.Worker.class));
}
@Test
void shouldFailToCreateControllerDueToInvalidWorkerCount() {
assertThrows(IllegalArgumentException.class, () -> createController(0));
assertThrows(IllegalArgumentException.class, () -> createController(-1));
}
} }