Enable Virtual Thread when running on JVM 21 (#7261)

#### What type of PR is this?

/kind improvement
/area core
/milestone 2.20.x

#### What this PR does / why we need it:

This PR enables Virtual Thread for instances running on JVM 21. This won't affect instances running on JVM 17.

References:
- https://spring.io/blog/2023/10/31/what-new-is-coming-in-reactor-core-3-6-0
- https://spring.io/blog/2022/10/11/embracing-virtual-threads
- https://docs.spring.io/spring-boot/3.4/reference/features/task-execution-and-scheduling.html

#### Does this PR introduce a user-facing change?

```release-note
None
```
pull/7268/head
John Niang 2025-03-05 10:32:57 +08:00 committed by GitHub
parent 8d9b2e6ee7
commit 00c8cbb7bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 42 deletions

View File

@ -25,4 +25,4 @@ RUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \
Expose 8090 Expose 8090
ENTRYPOINT ["sh", "-c", "java ${JVM_OPTS} org.springframework.boot.loader.launch.JarLauncher ${0} ${@}"] ENTRYPOINT ["sh", "-c", "java -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true ${JVM_OPTS} org.springframework.boot.loader.launch.JarLauncher ${0} ${@}"]

View File

@ -2,15 +2,16 @@ package run.halo.app.extension.controller;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.boot.system.JavaVersion;
import org.springframework.boot.task.SimpleAsyncTaskExecutorBuilder;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -32,7 +33,7 @@ public class DefaultController<R> implements Controller {
private volatile boolean started = false; private volatile boolean started = false;
private final ExecutorService executor; private final Executor executor;
@Nullable @Nullable
private final Synchronizer<R> synchronizer; private final Synchronizer<R> synchronizer;
@ -53,6 +54,18 @@ public class DefaultController<R> implements Controller {
Duration minDelay, Duration minDelay,
Duration maxDelay, Duration maxDelay,
ExecutorService executor, int workerCount) { ExecutorService executor, int workerCount) {
this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay,
(Executor) executor, workerCount);
}
public DefaultController(String name,
Reconciler<R> reconciler,
RequestQueue<R> queue,
Synchronizer<R> synchronizer,
Supplier<Instant> nowSupplier,
Duration minDelay,
Duration maxDelay,
Executor executor, int workerCount) {
Assert.isTrue(workerCount > 0, "Worker count must not be less than 1"); Assert.isTrue(workerCount > 0, "Worker count must not be less than 1");
this.name = name; this.name = name;
this.reconciler = reconciler; this.reconciler = reconciler;
@ -92,15 +105,17 @@ public class DefaultController<R> implements Controller {
Duration minDelay, Duration minDelay,
Duration maxDelay, int workerCount) { Duration maxDelay, int workerCount) {
this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay,
Executors.newFixedThreadPool(workerCount, threadFactory(name)), workerCount); executor(workerCount, name), workerCount);
} }
private static ThreadFactory threadFactory(String name) { private static SimpleAsyncTaskExecutor executor(int workerCount, String name) {
return new BasicThreadFactory.Builder() boolean virtualThreads =
.namingPattern(name + "-t-%d") JavaVersion.getJavaVersion().isEqualOrNewerThan(JavaVersion.TWENTY_ONE);
.daemon(false) return new SimpleAsyncTaskExecutorBuilder()
.uncaughtExceptionHandler((t, e) -> .virtualThreads(virtualThreads)
log.error("Controller " + t.getName() + " encountered an error unexpectedly", e)) .concurrencyLimit(workerCount)
.taskTerminationTimeout(Duration.ofSeconds(10))
.threadNamePrefix(name + "-")
.build(); .build();
} }
@ -123,7 +138,7 @@ public class DefaultController<R> implements Controller {
log.info("Starting controller {}", name); log.info("Starting controller {}", name);
IntStream.range(0, getWorkerCount()) IntStream.range(0, getWorkerCount())
.mapToObj(i -> new Worker()) .mapToObj(i -> new Worker())
.forEach(executor::submit); .forEach(executor::execute);
} }
/** /**
@ -226,14 +241,18 @@ public class DefaultController<R> implements Controller {
synchronizer.dispose(); synchronizer.dispose();
} }
executor.shutdownNow();
try { try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { if (executor instanceof AutoCloseable closeable) {
log.warn("Wait timeout for controller {} shutdown", name); closeable.close();
} else { if (Thread.currentThread().isInterrupted()) {
log.info("Controller {} is disposed", name); log.warn("Wait timeout for controller {} shutdown", name);
} else {
log.info("Controller {} is disposed", name);
}
} else if (executor instanceof ExecutorService executorService) {
closeExecutorService(executorService);
} }
} catch (InterruptedException e) { } catch (Exception e) {
log.warn("Interrupted while waiting for controller {} shutdown", name); log.warn("Interrupted while waiting for controller {} shutdown", name);
} finally { } finally {
queue.dispose(); queue.dispose();
@ -248,4 +267,35 @@ public class DefaultController<R> implements Controller {
public boolean isStarted() { public boolean isStarted() {
return started; return started;
} }
/**
* Close executor service.
* <br>
* This method copied from
* <a href="https://github.com/openjdk/jdk/blob/890adb6410dab4606a4f26a942aed02fb2f55387/src/java.base/share/classes/java/util/concurrent/ExecutorService.java#L410-L429">ExecutorService#close implemented in JDK 21</a>
*
* @param executorService executor service to be closed
*/
// TODO Remove this method and use ExecutorService#close instead after using JDK 21 as the
// minimum version
private static void closeExecutorService(ExecutorService executorService) {
boolean terminated = executorService.isTerminated();
if (!terminated) {
executorService.shutdown();
boolean interrupted = false;
while (!terminated) {
try {
terminated = executorService.awaitTermination(1L, TimeUnit.MINUTES);
} catch (InterruptedException e) {
if (!interrupted) {
executorService.shutdownNow();
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
} }

View File

@ -8,8 +8,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -18,7 +16,7 @@ import static org.mockito.Mockito.when;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -235,13 +233,16 @@ class DefaultControllerTest {
verify(synchronizer, times(1)).dispose(); verify(synchronizer, times(1)).dispose();
verify(queue, times(1)).dispose(); verify(queue, times(1)).dispose();
verify(executor, times(1)).shutdownNow(); verify(executor).shutdown();
verify(executor, never()).shutdownNow();
verify(executor, times(1)).awaitTermination(anyLong(), any()); verify(executor, times(1)).awaitTermination(anyLong(), any());
} }
@Test @Test
void shouldDisposeCorrectlyEvenIfTimeoutAwaitTermination() throws InterruptedException { void shouldDisposeCorrectlyEvenIfTimeoutAwaitTermination() throws InterruptedException {
when(executor.awaitTermination(anyLong(), any())).thenThrow(InterruptedException.class); when(executor.awaitTermination(anyLong(), any()))
.thenThrow(InterruptedException.class)
.thenReturn(true);
controller.dispose(); controller.dispose();
@ -250,46 +251,36 @@ class DefaultControllerTest {
verify(synchronizer, times(1)).dispose(); verify(synchronizer, times(1)).dispose();
verify(queue, times(1)).dispose(); verify(queue, times(1)).dispose();
verify(executor).shutdown();
verify(executor, times(1)).shutdownNow(); verify(executor, times(1)).shutdownNow();
verify(executor, times(1)).awaitTermination(anyLong(), any()); verify(executor, times(2)).awaitTermination(anyLong(), any());
} }
@Test @Test
void shouldStartCorrectly() throws InterruptedException { void shouldStartCorrectly() {
when(executor.submit(any(Runnable.class))).thenAnswer(invocation -> {
doNothing().when(synchronizer).start();
when(queue.take()).thenThrow(InterruptedException.class);
// invoke the task really
((Runnable) invocation.getArgument(0)).run();
return mock(Future.class);
});
controller.start(); controller.start();
assertTrue(controller.isStarted()); assertTrue(controller.isStarted());
assertFalse(controller.isDisposed()); assertFalse(controller.isDisposed());
verify(executor, times(1)).submit(any(Runnable.class)); verify(executor).execute(any(Runnable.class));
verify(synchronizer, times(1)).start();
verify(queue, times(1)).take();
verify(reconciler, times(0)).reconcile(any());
} }
@Test @Test
void shouldNotStartWhenDisposed() { void shouldNotStartWhenDisposed() throws InterruptedException {
when(executor.awaitTermination(1, TimeUnit.MINUTES)).thenReturn(true);
controller.dispose(); controller.dispose();
controller.start(); controller.start();
assertFalse(controller.isStarted()); assertFalse(controller.isStarted());
assertTrue(controller.isDisposed()); assertTrue(controller.isDisposed());
verify(executor, times(0)).submit(any(Runnable.class)); verify(executor, times(0)).execute(any(Runnable.class));
} }
@Test @Test
void shouldCreateMultiWorkers() { void shouldCreateMultiWorkers() {
controller = createController(5); controller = createController(5);
controller.start(); controller.start();
verify(executor, times(5)).submit(any(DefaultController.Worker.class)); verify(executor, times(5)).execute(any(DefaultController.Worker.class));
} }
@Test @Test

View File

@ -34,6 +34,9 @@ spring:
type: caffeine type: caffeine
caffeine: caffeine:
spec: expireAfterAccess=1h, maximumSize=10000 spec: expireAfterAccess=1h, maximumSize=10000
threads:
virtual:
enabled: true
halo: halo:
work-dir: ${user.home}/.halo2 work-dir: ${user.home}/.halo2