fix: cannot stop thread when system is interrupted (#2639)

#### What type of PR is this?
/kind bug
/kind improvement
/area core
/milestone 2.0

#### What this PR does / why we need it:
修复 Halo 异常停止时日志服务线程无法中断的问题

#### Special notes for your reviewer:
how to test it?
使用 mysql 启动 halo 但不启动 mysql,此时 halo 会无法链接 mysql
期望现象:Halo 服务终止,异常现象:Halo 抛异常但 Netty 服务器不会停止

/cc @halo-dev/sig-halo 
#### Does this PR introduce a user-facing change?

```release-note
修复 Halo 异常停止时日志服务线程无法中断的问题
```
pull/2663/head v2.0.0-alpha.4
guqing 2022-11-03 14:32:19 +08:00 committed by GitHub
parent 73df5e4576
commit 9b4ed96e2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 69 deletions

View File

@ -9,8 +9,8 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -29,8 +29,8 @@ import run.halo.app.infra.utils.JsonUtils;
*/ */
@Slf4j @Slf4j
@Component @Component
public class CounterMeterHandler implements DisposableBean { public class CounterMeterHandler implements SmartLifecycle {
private volatile boolean started = false;
private final ReactiveExtensionClient client; private final ReactiveExtensionClient client;
private final MeterRegistry meterRegistry; private final MeterRegistry meterRegistry;
@ -135,8 +135,23 @@ public class CounterMeterHandler implements DisposableBean {
} }
@Override @Override
public void destroy() { public void start() {
this.started = true;
}
@Override
public void stop() {
log.debug("Persist counter meters to database before destroy..."); log.debug("Persist counter meters to database before destroy...");
save().block(); try {
save().block();
} catch (Exception e) {
log.error("Persist counter meters to database failed.", e);
}
this.started = false;
}
@Override
public boolean isRunning() {
return started;
} }
} }

View File

@ -7,16 +7,17 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayDeque; import java.util.Queue;
import java.util.Deque; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import run.halo.app.infra.properties.HaloProperties; import run.halo.app.infra.properties.HaloProperties;
import run.halo.app.infra.utils.FileUtils; import run.halo.app.infra.utils.FileUtils;
@ -33,10 +34,12 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
private static final String LOG_FILE_LOCATION = "logs"; private static final String LOG_FILE_LOCATION = "logs";
private final AsyncLogWriter asyncLogWriter; private final AsyncLogWriter asyncLogWriter;
private volatile boolean interruptThread = false; private volatile boolean interruptThread = false;
private volatile boolean started = false;
private final ExecutorService executorService;
private final Path logFilePath; private final Path logFilePath;
private VisitLogWriter(HaloProperties haloProperties) throws IOException { public VisitLogWriter(HaloProperties haloProperties) throws IOException {
Path logsPath = haloProperties.getWorkDir() Path logsPath = haloProperties.getWorkDir()
.resolve(LOG_FILE_LOCATION); .resolve(LOG_FILE_LOCATION);
if (!Files.exists(logsPath)) { if (!Files.exists(logsPath)) {
@ -44,10 +47,15 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
} }
this.logFilePath = logsPath.resolve(LOG_FILE_NAME); this.logFilePath = logsPath.resolve(LOG_FILE_NAME);
this.asyncLogWriter = new AsyncLogWriter(logFilePath); this.asyncLogWriter = new AsyncLogWriter(logFilePath);
this.executorService = Executors.newFixedThreadPool(1);
} }
public synchronized void log(String logMsg) { public void log(String logMsg) {
asyncLogWriter.put(logMsg); try {
asyncLogWriter.put(logMsg);
} catch (InterruptedException e) {
log.error("Failed to log visit log: {}", ExceptionUtils.getStackTrace(e));
}
} }
public Path getLogFilePath() { public Path getLogFilePath() {
@ -55,13 +63,22 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
} }
void start() { void start() {
if (started) {
return;
}
log.debug("Starting write visit log..."); log.debug("Starting write visit log...");
Thread thread = new Thread(() -> { this.started = true;
while (!interruptThread) { executorService.submit(() -> {
asyncLogWriter.writeLog(); while (!interruptThread && !Thread.currentThread().isInterrupted()) {
try {
asyncLogWriter.writeLog();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("VisitLogWrite thread [{}] interrupted",
Thread.currentThread().getName());
}
} }
}, "visits-log-writer"); });
thread.start();
} }
@Override @Override
@ -69,21 +86,29 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
start(); start();
} }
@Override public boolean isStarted() {
public void destroy() throws Exception { return started;
asyncLogWriter.close();
interruptThread = true;
} }
static class AsyncLogWriter { public long queuedSize() {
return asyncLogWriter.logQueue.size();
}
@Override
public void destroy() throws Exception {
this.started = false;
interruptThread = true;
asyncLogWriter.dispose();
executorService.shutdown();
}
static class AsyncLogWriter implements Disposable {
private static final int MAX_LOG_SIZE = 10000; private static final int MAX_LOG_SIZE = 10000;
private static final int BATCH_SIZE = 10; private static final int BATCH_SIZE = 10;
private final ReentrantLock lock = new ReentrantLock();
private final Condition fullCondition = lock.newCondition();
private final Condition emptyCondition = lock.newCondition();
private final BufferedOutputStream writer; private final BufferedOutputStream writer;
private final Deque<String> logQueue; private final Queue<String> logQueue;
private final AtomicInteger logBatch = new AtomicInteger(0); private final AtomicInteger logBatch = new AtomicInteger(0);
private volatile boolean disposed = false;
public AsyncLogWriter(Path logFilePath) { public AsyncLogWriter(Path logFilePath) {
OutputStream outputStream; OutputStream outputStream;
@ -95,33 +120,20 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.writer = new BufferedOutputStream(outputStream); this.writer = new BufferedOutputStream(outputStream);
this.logQueue = new ArrayDeque<>(); this.logQueue = new LinkedBlockingDeque<>(MAX_LOG_SIZE);
} }
public void writeLog() { public void writeLog() throws InterruptedException {
lock.lock(); if (logQueue.isEmpty()) {
try { return;
// queue is empty, wait for new log
while (logQueue.isEmpty()) {
try {
emptyCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String logMessage = logQueue.poll();
writeToDisk(logMessage);
log.debug("Consumption visit log message: [{}]", logMessage);
// signal log producer
fullCondition.signal();
} finally {
lock.unlock();
} }
String logMessage = logQueue.poll();
writeToDisk(logMessage);
log.debug("Consumption visit log message: [{}]", logMessage);
} }
void writeToDisk(String logMsg) { void writeToDisk(String logMsg) throws InterruptedException {
String format = String.format("%s %s\n", Instant.now(), logMsg); String format = String.format("%s %s\n", Instant.now(), logMsg);
lock.lock();
try { try {
writer.write(format.getBytes(), 0, format.length()); writer.write(format.getBytes(), 0, format.length());
int size = logBatch.incrementAndGet(); int size = logBatch.incrementAndGet();
@ -131,33 +143,18 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
} }
} catch (IOException e) { } catch (IOException e) {
log.warn("Record access log failure: ", ExceptionUtils.getRootCause(e)); log.warn("Record access log failure: ", ExceptionUtils.getRootCause(e));
} finally {
lock.unlock();
} }
} }
public void put(String logMessage) { public void put(String logMessage) throws InterruptedException {
lock.lock(); // add log message to queue tail
try { logQueue.add(logMessage);
while (logQueue.size() == MAX_LOG_SIZE) { log.info("Production a log messages [{}]", logMessage);
try {
log.debug("Queue full, producer thread waiting...");
fullCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// add log message to queue tail
logQueue.add(logMessage);
log.info("Production a log messages [{}]", logMessage);
// signal consumer thread
emptyCondition.signal();
} finally {
lock.unlock();
}
} }
public void close() { @Override
public void dispose() {
this.disposed = true;
if (writer != null) { if (writer != null) {
try { try {
writer.flush(); writer.flush();
@ -167,5 +164,10 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
FileUtils.closeQuietly(writer); FileUtils.closeQuietly(writer);
} }
} }
@Override
public boolean isDisposed() {
return this.disposed;
}
} }
} }

View File

@ -0,0 +1,53 @@
package run.halo.app.metrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.util.FileSystemUtils;
import run.halo.app.infra.properties.HaloProperties;
/**
* Tests for {@link VisitLogWriter}.
*
* @author guqing
* @since 2.0.0
*/
@ExtendWith(MockitoExtension.class)
class VisitLogWriterTest {
@Mock
private HaloProperties haloProperties;
private VisitLogWriter visitLogWriter;
private Path workDir;
@BeforeEach
void setUp() throws IOException {
workDir = Files.createTempDirectory("halo-visitlog");
when(haloProperties.getWorkDir()).thenReturn(workDir);
visitLogWriter = new VisitLogWriter(haloProperties);
}
@AfterEach
void tearDown() throws Exception {
visitLogWriter.destroy();
FileSystemUtils.deleteRecursively(workDir);
}
@Test
void start() {
assertThat(visitLogWriter.isStarted()).isFalse();
visitLogWriter.start();
assertThat(visitLogWriter.isStarted()).isTrue();
}
}