From 00c8cbb7bbf28a51bbae573dbb62f5b04c6a324e Mon Sep 17 00:00:00 2001 From: John Niang Date: Wed, 5 Mar 2025 10:32:57 +0800 Subject: [PATCH] Enable Virtual Thread when running on JVM 21 (#7261) #### What type of PR is this? /kind improvement /area core /milestone 2.20.x #### What this PR does / why we need it: This PR enables Virtual Thread for instances running on JVM 21. This won't affect instances running on JVM 17. References: - https://spring.io/blog/2023/10/31/what-new-is-coming-in-reactor-core-3-6-0 - https://spring.io/blog/2022/10/11/embracing-virtual-threads - https://docs.spring.io/spring-boot/3.4/reference/features/task-execution-and-scheduling.html #### Does this PR introduce a user-facing change? ```release-note None ``` --- Dockerfile | 2 +- .../controller/DefaultController.java | 86 +++++++++++++++---- .../controller/DefaultControllerTest.java | 37 +++----- .../src/main/resources/application.yaml | 3 + 4 files changed, 86 insertions(+), 42 deletions(-) diff --git a/Dockerfile b/Dockerfile index 369303def..3737e33a8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,4 +25,4 @@ RUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \ Expose 8090 -ENTRYPOINT ["sh", "-c", "java ${JVM_OPTS} org.springframework.boot.loader.launch.JarLauncher ${0} ${@}"] +ENTRYPOINT ["sh", "-c", "java -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true ${JVM_OPTS} org.springframework.boot.loader.launch.JarLauncher ${0} ${@}"] diff --git a/api/src/main/java/run/halo/app/extension/controller/DefaultController.java b/api/src/main/java/run/halo/app/extension/controller/DefaultController.java index 532acfd9f..895bb4735 100644 --- a/api/src/main/java/run/halo/app/extension/controller/DefaultController.java +++ b/api/src/main/java/run/halo/app/extension/controller/DefaultController.java @@ -2,15 +2,16 @@ package run.halo.app.extension.controller; import java.time.Duration; import java.time.Instant; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.springframework.boot.system.JavaVersion; +import org.springframework.boot.task.SimpleAsyncTaskExecutorBuilder; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -32,7 +33,7 @@ public class DefaultController implements Controller { private volatile boolean started = false; - private final ExecutorService executor; + private final Executor executor; @Nullable private final Synchronizer synchronizer; @@ -53,6 +54,18 @@ public class DefaultController implements Controller { Duration minDelay, Duration maxDelay, ExecutorService executor, int workerCount) { + this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, + (Executor) executor, workerCount); + } + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + Synchronizer synchronizer, + Supplier nowSupplier, + Duration minDelay, + Duration maxDelay, + Executor executor, int workerCount) { Assert.isTrue(workerCount > 0, "Worker count must not be less than 1"); this.name = name; this.reconciler = reconciler; @@ -92,15 +105,17 @@ public class DefaultController implements Controller { Duration minDelay, Duration maxDelay, int workerCount) { this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, - Executors.newFixedThreadPool(workerCount, threadFactory(name)), workerCount); + executor(workerCount, name), workerCount); } - private static ThreadFactory threadFactory(String name) { - return new BasicThreadFactory.Builder() - .namingPattern(name + "-t-%d") - .daemon(false) - .uncaughtExceptionHandler((t, e) -> - log.error("Controller " + t.getName() + " encountered an error unexpectedly", e)) + private static SimpleAsyncTaskExecutor executor(int workerCount, String name) { + boolean virtualThreads = + JavaVersion.getJavaVersion().isEqualOrNewerThan(JavaVersion.TWENTY_ONE); + return new SimpleAsyncTaskExecutorBuilder() + .virtualThreads(virtualThreads) + .concurrencyLimit(workerCount) + .taskTerminationTimeout(Duration.ofSeconds(10)) + .threadNamePrefix(name + "-") .build(); } @@ -123,7 +138,7 @@ public class DefaultController implements Controller { log.info("Starting controller {}", name); IntStream.range(0, getWorkerCount()) .mapToObj(i -> new Worker()) - .forEach(executor::submit); + .forEach(executor::execute); } /** @@ -226,14 +241,18 @@ public class DefaultController implements Controller { synchronizer.dispose(); } - executor.shutdownNow(); try { - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { - log.warn("Wait timeout for controller {} shutdown", name); - } else { - log.info("Controller {} is disposed", name); + if (executor instanceof AutoCloseable closeable) { + closeable.close(); + if (Thread.currentThread().isInterrupted()) { + log.warn("Wait timeout for controller {} shutdown", name); + } else { + log.info("Controller {} is disposed", name); + } + } else if (executor instanceof ExecutorService executorService) { + closeExecutorService(executorService); } - } catch (InterruptedException e) { + } catch (Exception e) { log.warn("Interrupted while waiting for controller {} shutdown", name); } finally { queue.dispose(); @@ -248,4 +267,35 @@ public class DefaultController implements Controller { public boolean isStarted() { return started; } + + /** + * Close executor service. + *
+ * This method copied from + * ExecutorService#close implemented in JDK 21 + * + * @param executorService executor service to be closed + */ + // TODO Remove this method and use ExecutorService#close instead after using JDK 21 as the + // minimum version + private static void closeExecutorService(ExecutorService executorService) { + boolean terminated = executorService.isTerminated(); + if (!terminated) { + executorService.shutdown(); + boolean interrupted = false; + while (!terminated) { + try { + terminated = executorService.awaitTermination(1L, TimeUnit.MINUTES); + } catch (InterruptedException e) { + if (!interrupted) { + executorService.shutdownNow(); + interrupted = true; + } + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } } diff --git a/api/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java b/api/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java index 86f6d0d3c..476f1c7be 100644 --- a/api/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java +++ b/api/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java @@ -8,8 +8,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -18,7 +16,7 @@ import static org.mockito.Mockito.when; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -235,13 +233,16 @@ class DefaultControllerTest { verify(synchronizer, times(1)).dispose(); verify(queue, times(1)).dispose(); - verify(executor, times(1)).shutdownNow(); + verify(executor).shutdown(); + verify(executor, never()).shutdownNow(); verify(executor, times(1)).awaitTermination(anyLong(), any()); } @Test void shouldDisposeCorrectlyEvenIfTimeoutAwaitTermination() throws InterruptedException { - when(executor.awaitTermination(anyLong(), any())).thenThrow(InterruptedException.class); + when(executor.awaitTermination(anyLong(), any())) + .thenThrow(InterruptedException.class) + .thenReturn(true); controller.dispose(); @@ -250,46 +251,36 @@ class DefaultControllerTest { verify(synchronizer, times(1)).dispose(); verify(queue, times(1)).dispose(); + verify(executor).shutdown(); verify(executor, times(1)).shutdownNow(); - verify(executor, times(1)).awaitTermination(anyLong(), any()); + verify(executor, times(2)).awaitTermination(anyLong(), any()); } @Test - void shouldStartCorrectly() throws InterruptedException { - when(executor.submit(any(Runnable.class))).thenAnswer(invocation -> { - doNothing().when(synchronizer).start(); - when(queue.take()).thenThrow(InterruptedException.class); - - // invoke the task really - ((Runnable) invocation.getArgument(0)).run(); - return mock(Future.class); - }); + void shouldStartCorrectly() { controller.start(); - assertTrue(controller.isStarted()); assertFalse(controller.isDisposed()); - verify(executor, times(1)).submit(any(Runnable.class)); - verify(synchronizer, times(1)).start(); - verify(queue, times(1)).take(); - verify(reconciler, times(0)).reconcile(any()); + verify(executor).execute(any(Runnable.class)); } @Test - void shouldNotStartWhenDisposed() { + void shouldNotStartWhenDisposed() throws InterruptedException { + when(executor.awaitTermination(1, TimeUnit.MINUTES)).thenReturn(true); controller.dispose(); controller.start(); assertFalse(controller.isStarted()); assertTrue(controller.isDisposed()); - verify(executor, times(0)).submit(any(Runnable.class)); + verify(executor, times(0)).execute(any(Runnable.class)); } @Test void shouldCreateMultiWorkers() { controller = createController(5); controller.start(); - verify(executor, times(5)).submit(any(DefaultController.Worker.class)); + verify(executor, times(5)).execute(any(DefaultController.Worker.class)); } @Test diff --git a/application/src/main/resources/application.yaml b/application/src/main/resources/application.yaml index 29683cea5..a42536af8 100644 --- a/application/src/main/resources/application.yaml +++ b/application/src/main/resources/application.yaml @@ -34,6 +34,9 @@ spring: type: caffeine caffeine: spec: expireAfterAccess=1h, maximumSize=10000 + threads: + virtual: + enabled: true halo: work-dir: ${user.home}/.halo2