From c80c5e23c63a03a53b50a547851419a4ce2129ee Mon Sep 17 00:00:00 2001 From: John Niang Date: Sat, 12 Aug 2023 17:14:11 +0800 Subject: [PATCH] Refactor the transformation between data buffers and input stream (#4391) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind cleanup /area core /milestone 2.9.x #### What this PR does / why we need it: Before this, If we use a file with length less than 256KB for recovery, the process remains stagnant until we cancel the request. This PR refactors the transformation between data buffers and input stream and resolve the issue above. We should avoid returning InputStream directly in reactive stream. - DataBufferUtils before ```java public static InputStream toInputStream(Flux content) throws IOException { var pos = new PipedOutputStream(); var pis = new PipedInputStream(pos); write(content, pos) .doOnComplete(() -> { try { pos.close(); } catch (IOException ignored) { // Ignore the error } }) .subscribeOn(Schedulers.boundedElastic()) .subscribe(releaseConsumer(), error -> { if (error instanceof IOException) { // Ignore the error return; } log.error("Failed to write DataBuffer into OutputStream", error); }); return pis; ``` - DataBufferUtils after ```java public static Mono toInputStream(Publisher content, Scheduler scheduler) { return Mono.create(sink -> { try { var pos = new PipedOutputStream(); var pis = new PipedInputStream(pos); var disposable = write(content, pos) .subscribeOn(scheduler) .subscribe(releaseConsumer(), sink::error, () -> FileUtils.closeQuietly(pos), Context.of(sink.contextView())); sink.onDispose(disposable); sink.success(pis); } catch (IOException e) { sink.error(e); } }); ``` #### Special notes for your reviewer: Please test for plugins, themes and migrations. #### Does this PR introduce a user-facing change? ```release-note 解决备份恢复时因文件小于 256KB 而导致接口卡住的问题。 ``` --- .../extension/endpoint/PluginEndpoint.java | 111 +++----- .../core/extension/theme/ThemeEndpoint.java | 133 ++++----- .../core/extension/theme/ThemeService.java | 7 +- .../extension/theme/ThemeServiceImpl.java | 121 ++++---- .../app/core/extension/theme/ThemeUtils.java | 153 +++++----- .../app/infra/DefaultThemeInitializer.java | 24 +- .../halo/app/infra/utils/DataBufferUtils.java | 47 +-- .../run/halo/app/infra/utils/FileUtils.java | 57 +++- .../migration/impl/MigrationServiceImpl.java | 268 ++++++++---------- .../extension/theme/ThemeEndpointTest.java | 51 ++-- .../extension/theme/ThemeServiceImplTest.java | 57 ++-- 11 files changed, 475 insertions(+), 554 deletions(-) diff --git a/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java b/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java index 2426e8b81..430798aef 100644 --- a/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java +++ b/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java @@ -9,16 +9,16 @@ import static org.springdoc.core.fn.builders.parameter.Builder.parameterBuilder; import static org.springdoc.core.fn.builders.requestbody.Builder.requestBodyBuilder; import static org.springdoc.core.fn.builders.schema.Builder.schemaBuilder; import static org.springframework.boot.convert.ApplicationConversionService.getSharedInstance; +import static org.springframework.core.io.buffer.DataBufferUtils.write; import static org.springframework.web.reactive.function.server.RequestPredicates.contentType; import static run.halo.app.extension.ListResult.generateGenericClass; import static run.halo.app.extension.router.QueryParamBuildUtil.buildParametersFromType; import static run.halo.app.extension.router.selector.SelectorUtil.labelAndFieldSelectorToPredicate; +import static run.halo.app.infra.utils.FileUtils.deleteFileSilently; import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Schema; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -33,7 +33,9 @@ import java.util.function.Function; import java.util.function.Predicate; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springdoc.webflux.core.fn.SpringdocRouteBuilder; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Sort; import org.springframework.http.MediaType; @@ -41,7 +43,6 @@ import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FormFieldPart; import org.springframework.http.codec.multipart.Part; import org.springframework.stereotype.Component; -import org.springframework.util.FileCopyUtils; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.server.RouterFunction; @@ -50,6 +51,7 @@ import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebInputException; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.Plugin; @@ -61,9 +63,6 @@ import run.halo.app.extension.ConfigMap; import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.router.IListRequest.QueryListRequest; import run.halo.app.infra.ReactiveUrlDataBufferFetcher; -import run.halo.app.infra.exception.ThemeInstallationException; -import run.halo.app.infra.exception.ThemeUpgradeException; -import run.halo.app.infra.utils.DataBufferUtils; import run.halo.app.plugin.PluginNotFoundException; @Slf4j @@ -77,6 +76,8 @@ public class PluginEndpoint implements CustomEndpoint { private final ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private final Scheduler scheduler = Schedulers.boundedElastic(); + @Override public RouterFunction endpoint() { final var tag = "api.console.halo.run/v1alpha1/Plugin"; @@ -221,48 +222,29 @@ public class PluginEndpoint implements CustomEndpoint { } private Mono upgradeFromUri(ServerRequest request) { - final var name = request.pathVariable("name"); - return request.bodyToMono(UpgradeFromUriRequest.class) - .flatMap(upgradeRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(upgradeRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .onErrorMap(throwable -> { - log.error("Failed to fetch plugin file from uri.", throwable); - return new ThemeUpgradeException("Failed to fetch plugin file from uri.", null, - null); - }) - .flatMap(inputStream -> Mono.usingWhen( - transferToTemp(inputStream), - (path) -> pluginService.upgrade(name, path), + var name = request.pathVariable("name"); + var content = request.bodyToMono(UpgradeFromUriRequest.class) + .map(UpgradeFromUriRequest::uri) + .flatMapMany(reactiveUrlDataBufferFetcher::fetch); + + return Mono.usingWhen( + writeToTempFile(content), + path -> pluginService.upgrade(name, path), this::deleteFileIfExists) - ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(upgradedPlugin -> ServerResponse.ok().bodyValue(upgradedPlugin)); } private Mono installFromUri(ServerRequest request) { - return request.bodyToMono(InstallFromUriRequest.class) - .flatMap(installRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(installRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch plugin file from uri.", throwable); - throw new ThemeInstallationException("Failed to fetch plugin file from uri.", null, - null); - }) - .flatMap(inputStream -> Mono.usingWhen( - transferToTemp(inputStream), + var content = request.bodyToMono(InstallFromUriRequest.class) + .map(InstallFromUriRequest::uri) + .flatMapMany(reactiveUrlDataBufferFetcher::fetch); + + return Mono.usingWhen( + writeToTempFile(content), pluginService::install, - this::deleteFileIfExists) + this::deleteFileIfExists ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(newPlugin -> ServerResponse.ok().bodyValue(newPlugin)); } public record InstallFromUriRequest(@Schema(requiredMode = REQUIRED) URI uri) { @@ -402,10 +384,12 @@ public class PluginEndpoint implements CustomEndpoint { .bodyValue(upgradedPlugin)); } - private Mono installFromFile(Mono filePartMono, + private Mono installFromFile(FilePart filePart, Function> resourceClosure) { - var pathMono = filePartMono.flatMap(this::transferToTemp); - return Mono.usingWhen(pathMono, resourceClosure, this::deleteFileIfExists); + return Mono.usingWhen( + writeToTempFile(filePart.content()), + resourceClosure, + this::deleteFileIfExists); } private Mono installFromPreset(Mono presetNameMono, @@ -529,19 +513,18 @@ public class PluginEndpoint implements CustomEndpoint { } @Schema(requiredMode = NOT_REQUIRED, description = "Plugin Jar file.") - public Mono getFile() { + public FilePart getFile() { var part = multipartData.getFirst("file"); if (part == null) { - return Mono.error(new ServerWebInputException("Form field file is required")); + throw new ServerWebInputException("Form field file is required"); } if (!(part instanceof FilePart file)) { - return Mono.error(new ServerWebInputException("Invalid parameter of file")); + throw new ServerWebInputException("Invalid parameter of file"); } if (!Paths.get(file.filename()).toString().endsWith(".jar")) { - return Mono.error( - new ServerWebInputException("Invalid file type, only jar is supported")); + throw new ServerWebInputException("Invalid file type, only jar is supported"); } - return Mono.just(file); + return file; } @Schema(requiredMode = NOT_REQUIRED, @@ -584,29 +567,13 @@ public class PluginEndpoint implements CustomEndpoint { } Mono deleteFileIfExists(Path path) { - return Mono.fromRunnable(() -> { - try { - Files.deleteIfExists(path); - } catch (IOException e) { - // ignore this error - log.warn("Failed to delete temporary jar file: {}", path, e); - } - }).subscribeOn(Schedulers.boundedElastic()).then(); + return deleteFileSilently(path, this.scheduler).then(); } - private Mono transferToTemp(FilePart filePart) { - return Mono.fromCallable(() -> Files.createTempFile("halo-plugins", ".jar")) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(path -> filePart.transferTo(path) - .thenReturn(path) - ); + private Mono writeToTempFile(Publisher content) { + return Mono.fromCallable(() -> Files.createTempFile("halo-plugin-", ".jar")) + .flatMap(path -> write(content, path).thenReturn(path)) + .subscribeOn(this.scheduler); } - private Mono transferToTemp(InputStream inputStream) { - return Mono.fromCallable(() -> { - Path tempFile = Files.createTempFile("halo-plugins", ".jar"); - FileCopyUtils.copy(inputStream, Files.newOutputStream(tempFile)); - return tempFile; - }).subscribeOn(Schedulers.boundedElastic()); - } } diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java index 0ce93cb14..233abad3e 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java @@ -7,11 +7,9 @@ import static org.springdoc.core.fn.builders.parameter.Builder.parameterBuilder; import static org.springdoc.core.fn.builders.requestbody.Builder.requestBodyBuilder; import static org.springdoc.core.fn.builders.schema.Builder.schemaBuilder; import static org.springframework.web.reactive.function.server.RequestPredicates.contentType; -import static run.halo.app.infra.utils.DataBufferUtils.toInputStream; import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.media.Schema; -import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -29,14 +27,11 @@ import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; -import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.ServerWebInputException; -import reactor.core.Exceptions; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.Setting; import run.halo.app.core.extension.Theme; @@ -51,9 +46,6 @@ import run.halo.app.infra.SystemConfigurableEnvironmentFetcher; import run.halo.app.infra.SystemSetting; import run.halo.app.infra.ThemeRootGetter; import run.halo.app.infra.exception.NotFoundException; -import run.halo.app.infra.exception.ThemeInstallationException; -import run.halo.app.infra.exception.ThemeUpgradeException; -import run.halo.app.infra.utils.DataBufferUtils; import run.halo.app.infra.utils.JsonUtils; import run.halo.app.theme.TemplateEngineManager; @@ -78,7 +70,7 @@ public class ThemeEndpoint implements CustomEndpoint { private final SystemConfigurableEnvironmentFetcher systemEnvironmentFetcher; - private final ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private final ReactiveUrlDataBufferFetcher urlDataBufferFetcher; @Override public RouterFunction endpoint() { @@ -244,43 +236,25 @@ public class ThemeEndpoint implements CustomEndpoint { private Mono upgradeFromUri(ServerRequest request) { final var name = request.pathVariable("name"); - return request.bodyToMono(UpgradeFromUriRequest.class) - .flatMap(upgradeRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(upgradeRequest.uri()))) + var content = request.bodyToMono(UpgradeFromUriRequest.class) + .map(UpgradeFromUriRequest::uri) + .flatMapMany(urlDataBufferFetcher::fetch); + + return themeService.upgrade(name, content) + .flatMap((updatedTheme) -> + templateEngineManager.clearCache(updatedTheme.getMetadata().getName()) + .thenReturn(updatedTheme) ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch zip file from uri.", throwable); - throw new ThemeUpgradeException("Failed to fetch zip file from uri.", null, - null); - }) - .flatMap(inputStream -> themeService.upgrade(name, inputStream)) - .flatMap((updatedTheme) -> templateEngineManager.clearCache( - updatedTheme.getMetadata().getName()) - .thenReturn(updatedTheme) - ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } private Mono installFromUri(ServerRequest request) { - return request.bodyToMono(InstallFromUriRequest.class) - .flatMap(installRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(installRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch zip file from uri.", throwable); - throw new ThemeInstallationException("Failed to fetch zip file from uri.", null, - null); - }) - .flatMap(themeService::install) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + var content = request.bodyToMono(InstallFromUriRequest.class) + .map(InstallFromUriRequest::uri) + .flatMapMany(urlDataBufferFetcher::fetch); + + return themeService.install(content) + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } private Mono activateTheme(ServerRequest request) { @@ -328,7 +302,7 @@ public class ThemeEndpoint implements CustomEndpoint { if (!configMapName.equals(configMapNameToUpdate)) { throw new ServerWebInputException( "The name from the request body does not match the theme " - + "configMapName name."); + + "configMapName name."); } }) .flatMap(configMapToUpdate -> client.fetch(ConfigMap.class, configMapName) @@ -442,22 +416,15 @@ public class ThemeEndpoint implements CustomEndpoint { private Mono upgrade(ServerRequest request) { // validate the theme first - var themeNameInPath = request.pathVariable("name"); + var name = request.pathVariable("name"); return request.multipartData() .map(UpgradeRequest::new) .map(UpgradeRequest::getFile) - .flatMap(file -> { - try { - return themeService.upgrade(themeNameInPath, toInputStream(file.content())); - } catch (IOException e) { - return Mono.error(e); - } - }) - .flatMap((updatedTheme) -> templateEngineManager.clearCache( - updatedTheme.getMetadata().getName()) - .thenReturn(updatedTheme)) - .flatMap(updatedTheme -> ServerResponse.ok() - .bodyValue(updatedTheme)); + .flatMap(filePart -> themeService.upgrade(name, filePart.content())) + .flatMap((updatedTheme) -> + templateEngineManager.clearCache(updatedTheme.getMetadata().getName()) + .thenReturn(updatedTheme)) + .flatMap(updatedTheme -> ServerResponse.ok().bodyValue(updatedTheme)); } Mono> listUninstalled(ThemeQuery query) { @@ -500,39 +467,39 @@ public class ThemeEndpoint implements CustomEndpoint { } @Schema(name = "ThemeInstallRequest") - public record InstallRequest( - @Schema(requiredMode = REQUIRED, description = "Theme zip file.") FilePart file) { + public static class InstallRequest { + + @Schema(hidden = true) + private final MultiValueMap multipartData; + + public InstallRequest(MultiValueMap multipartData) { + this.multipartData = multipartData; + } + + @Schema(requiredMode = REQUIRED, description = "Theme zip file.") + FilePart getFile() { + Part part = multipartData.getFirst("file"); + if (!(part instanceof FilePart file)) { + throw new ServerWebInputException( + "Invalid parameter of file, binary data is required"); + } + if (!Paths.get(file.filename()).toString().endsWith(".zip")) { + throw new ServerWebInputException( + "Invalid file type, only zip format is supported"); + } + return file; + } } public record InstallFromUriRequest(@Schema(requiredMode = REQUIRED) URI uri) { } Mono install(ServerRequest request) { - return request.body(BodyExtractors.toMultipartData()) - .flatMap(this::getZipFilePart) - .flatMap(file -> { - try { - return themeService.install(toInputStream(file.content())); - } catch (IOException e) { - return Mono.error(Exceptions.propagate(e)); - } - }) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme)); - } - - Mono getZipFilePart(MultiValueMap formData) { - Part part = formData.getFirst("file"); - if (!(part instanceof FilePart file)) { - return Mono.error(new ServerWebInputException( - "Invalid parameter of file, binary data is required")); - } - if (!Paths.get(file.filename()).toString().endsWith(".zip")) { - return Mono.error(new ServerWebInputException( - "Invalid file type, only zip format is supported")); - } - return Mono.just(file); + return request.multipartData() + .map(InstallRequest::new) + .map(InstallRequest::getFile) + .flatMap(filePart -> themeService.install(filePart.content())) + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } } diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java index 3bfe7e2e3..a3954a3fc 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java @@ -1,15 +1,16 @@ package run.halo.app.core.extension.theme; -import java.io.InputStream; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import reactor.core.publisher.Mono; import run.halo.app.core.extension.Theme; import run.halo.app.extension.ConfigMap; public interface ThemeService { - Mono install(InputStream is); + Mono install(Publisher content); - Mono upgrade(String themeName, InputStream is); + Mono upgrade(String themeName, Publisher content); Mono reloadTheme(String name); diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java index 44c9f1bf6..df519bd8a 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java @@ -1,34 +1,33 @@ package run.halo.app.core.extension.theme; -import static java.nio.file.Files.createTempDirectory; import static org.springframework.util.FileSystemUtils.copyRecursively; import static run.halo.app.core.extension.theme.ThemeUtils.loadThemeManifest; import static run.halo.app.core.extension.theme.ThemeUtils.locateThemeManifest; +import static run.halo.app.core.extension.theme.ThemeUtils.unzipThemeTo; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; import static run.halo.app.infra.utils.FileUtils.unzip; -import java.io.IOException; -import java.io.InputStream; import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import java.util.zip.ZipInputStream; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.retry.RetryException; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.springframework.web.server.ServerErrorException; import org.springframework.web.server.ServerWebInputException; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.AnnotationSetting; @@ -55,76 +54,60 @@ public class ThemeServiceImpl implements ThemeService { private final SystemVersionSupplier systemVersionSupplier; + private final Scheduler scheduler = Schedulers.boundedElastic(); + @Override - public Mono install(InputStream is) { + public Mono install(Publisher content) { var themeRoot = this.themeRoot.get(); - return ThemeUtils.unzipThemeTo(is, themeRoot) + return unzipThemeTo(content, themeRoot, scheduler) .flatMap(this::persistent); } @Override - public Mono upgrade(String themeName, InputStream is) { - var tempDir = new AtomicReference(); - var tempThemeRoot = new AtomicReference(); - return client.fetch(Theme.class, themeName) + public Mono upgrade(String themeName, Publisher content) { + var checkTheme = client.fetch(Theme.class, themeName) .switchIfEmpty(Mono.error(() -> new ServerWebInputException( - "The given theme with name " + themeName + " did not exist"))) - .publishOn(Schedulers.boundedElastic()) - .doFirst(() -> { - try { - tempDir.set(createTempDirectory("halo-theme-")); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(oldTheme -> { - try (var zis = new ZipInputStream(is)) { - unzip(zis, tempDir.get()); - return locateThemeManifest(tempDir.get()).switchIfEmpty(Mono.error( - () -> new ThemeUpgradeException( - "Missing theme manifest file: theme.yaml or theme.yml", - "problemDetail.theme.upgrade.missingManifest", null))); - } catch (IOException e) { - return Mono.error(e); - } - }) - .doOnNext(themeManifest -> { - if (log.isDebugEnabled()) { - log.debug("Found theme manifest file: {}", themeManifest); - } - tempThemeRoot.set(themeManifest.getParent()); - }) - .map(ThemeUtils::loadThemeManifest) - .doOnNext(newTheme -> { - if (!Objects.equals(themeName, newTheme.getMetadata().getName())) { - if (log.isDebugEnabled()) { - log.error("Want theme name: {}, but provided: {}", themeName, - newTheme.getMetadata().getName()); - } - throw new ThemeUpgradeException("Please make sure the theme name is correct", - "problemDetail.theme.upgrade.nameMismatch", - new Object[] {newTheme.getMetadata().getName(), themeName}); - } - }) - .flatMap(newTheme -> { - // Remove the theme before upgrading - return deleteThemeAndWaitForComplete(newTheme.getMetadata().getName()) - .thenReturn(newTheme); - }) - .doOnNext(newTheme -> { - // prepare the theme - var themePath = themeRoot.get().resolve(newTheme.getMetadata().getName()); - try { - copyRecursively(tempThemeRoot.get(), themePath); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(this::persistent) - .doFinally(signalType -> { - // clear the temporary folder - deleteRecursivelyAndSilently(tempDir.get()); - }); + "The given theme with name " + themeName + " did not exist"))); + var upgradeTheme = Mono.usingWhen( + createTempDir("halo-theme-", scheduler), + tempDir -> { + var locateThemeManifest = Mono.fromCallable(() -> locateThemeManifest(tempDir) + .orElseThrow(() -> new ThemeUpgradeException( + "Missing theme manifest file: theme.yaml or theme.yml", + "problemDetail.theme.upgrade.missingManifest", null))); + return unzip(content, tempDir, scheduler) + .then(locateThemeManifest) + .flatMap(themeManifest -> { + if (log.isDebugEnabled()) { + log.debug("Found theme manifest file: {}", themeManifest); + } + var newTheme = loadThemeManifest(themeManifest); + if (!Objects.equals(themeName, newTheme.getMetadata().getName())) { + if (log.isDebugEnabled()) { + log.error("Want theme name: {}, but provided: {}", themeName, + newTheme.getMetadata().getName()); + } + return Mono.error(new ThemeUpgradeException( + "Please make sure the theme name is correct", + "problemDetail.theme.upgrade.nameMismatch", + new Object[] {newTheme.getMetadata().getName(), themeName})); + } + + var copyTheme = Mono.fromCallable(() -> { + var themePath = themeRoot.get().resolve(themeName); + copyRecursively(themeManifest.getParent(), themePath); + return themePath; + }); + + return deleteThemeAndWaitForComplete(themeName) + .then(copyTheme) + .then(this.persistent(newTheme)); + }); + }, + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); + + return checkTheme.then(upgradeTheme); } /** diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java index 6de1b3f03..a0276a36e 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java @@ -1,12 +1,11 @@ package run.halo.app.core.extension.theme; -import static java.nio.file.Files.createTempDirectory; import static org.springframework.util.FileSystemUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; import static run.halo.app.infra.utils.FileUtils.unzip; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -15,13 +14,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.BaseStream; import java.util.stream.Stream; -import java.util.zip.ZipInputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; import org.springframework.web.server.ResponseStatusException; @@ -29,6 +28,7 @@ import org.springframework.web.server.ServerWebInputException; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import run.halo.app.core.extension.Theme; import run.halo.app.extension.Unstructured; @@ -101,64 +101,51 @@ class ThemeUtils { } } - static Mono unzipThemeTo(InputStream inputStream, Path themeWorkDir) { - return unzipThemeTo(inputStream, themeWorkDir, false) + static Mono unzipThemeTo(Publisher content, Path themeWorkDir, + Scheduler scheduler) { + return unzipThemeTo(content, themeWorkDir, false, scheduler) .onErrorMap(e -> !(e instanceof ResponseStatusException), e -> { log.error("Failed to unzip theme", e); throw new ServerWebInputException("Failed to unzip theme"); }); } - static Mono unzipThemeTo(InputStream inputStream, Path themeWorkDir, - boolean override) { - var tempDir = new AtomicReference(); - return Mono.just(inputStream) - .publishOn(Schedulers.boundedElastic()) - .doFirst(() -> { - try { - tempDir.set(createTempDirectory(THEME_TMP_PREFIX)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .doOnNext(is -> { - try (var zipIs = new ZipInputStream(is)) { - // unzip input stream into temporary directory - unzip(zipIs, tempDir.get()); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(is -> ThemeUtils.locateThemeManifest(tempDir.get())) - .switchIfEmpty( - Mono.error(() -> new ThemeInstallationException("Missing theme manifest", - "problemDetail.theme.install.missingManifest", null))) - .map(themeManifestPath -> { - var theme = loadThemeManifest(themeManifestPath); - var themeName = theme.getMetadata().getName(); - var themeTargetPath = themeWorkDir.resolve(themeName); - try { - if (!override && !FileUtils.isEmpty(themeTargetPath)) { - throw new ThemeAlreadyExistsException(themeName); - } - // install theme to theme work dir - copyRecursively(themeManifestPath.getParent(), themeTargetPath); - return theme; - } catch (IOException e) { - deleteRecursivelyAndSilently(themeTargetPath); - throw Exceptions.propagate(e); - } - }) - .doFinally(signalType -> { - FileUtils.closeQuietly(inputStream); - deleteRecursivelyAndSilently(tempDir.get()); - }); + static Mono unzipThemeTo(Publisher content, Path themeWorkDir, + boolean override, Scheduler scheduler) { + return Mono.usingWhen( + createTempDir(THEME_TMP_PREFIX, scheduler), + tempDir -> { + var locateThemeManifest = Mono.fromCallable(() -> locateThemeManifest(tempDir) + .orElseThrow(() -> new ThemeInstallationException("Missing theme manifest", + "problemDetail.theme.install.missingManifest", null))); + return unzip(content, tempDir, scheduler) + .then(locateThemeManifest) + .handle((themeManifestPath, sink) -> { + var theme = loadThemeManifest(themeManifestPath); + var themeName = theme.getMetadata().getName(); + var themeTargetPath = themeWorkDir.resolve(themeName); + try { + if (!override && !FileUtils.isEmpty(themeTargetPath)) { + sink.error(new ThemeAlreadyExistsException(themeName)); + return; + } + // install theme to theme work dir + copyRecursively(themeManifestPath.getParent(), themeTargetPath); + sink.next(theme); + } catch (IOException e) { + deleteRecursivelyAndSilently(themeTargetPath); + sink.error(e); + } + }) + .subscribeOn(scheduler); + }, + tempDir -> FileUtils.deleteRecursivelyAndSilently(tempDir, scheduler) + ); } static Unstructured loadThemeManifest(Path themeManifestPath) { - List unstructureds = - new YamlUnstructuredLoader(new FileSystemResource(themeManifestPath)) - .load(); + var unstructureds = new YamlUnstructuredLoader(new FileSystemResource(themeManifestPath)) + .load(); if (CollectionUtils.isEmpty(unstructureds)) { throw new ThemeInstallationException("Missing theme manifest", "problemDetail.theme.install.missingManifest", null); @@ -177,37 +164,35 @@ class ThemeUtils { return null; } - static Mono locateThemeManifest(Path dir) { - return Mono.justOrEmpty(dir) - .filter(Files::isDirectory) - .publishOn(Schedulers.boundedElastic()) - .mapNotNull(path -> { - var queue = new LinkedList(); - queue.add(dir); - var manifest = Optional.empty(); - while (!queue.isEmpty()) { - var current = queue.pop(); - try (Stream subPaths = Files.list(current)) { - manifest = subPaths.filter(Files::isReadable) - .filter(subPath -> { - if (Files.isDirectory(subPath)) { - queue.add(subPath); - return false; - } - return true; - }) - .filter(Files::isRegularFile) - .filter(ThemeUtils::isManifest) - .findFirst(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - if (manifest.isPresent()) { - break; - } - } - return manifest.orElse(null); - }); + static Optional locateThemeManifest(Path path) { + if (!Files.isDirectory(path)) { + return Optional.empty(); + } + var queue = new LinkedList(); + queue.add(path); + var manifest = Optional.empty(); + while (!queue.isEmpty()) { + var current = queue.pop(); + try (Stream subPaths = Files.list(current)) { + manifest = subPaths.filter(Files::isReadable) + .filter(subPath -> { + if (Files.isDirectory(subPath)) { + queue.add(subPath); + return false; + } + return true; + }) + .filter(Files::isRegularFile) + .filter(ThemeUtils::isManifest) + .findFirst(); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + if (manifest.isPresent()) { + break; + } + } + return manifest; } static boolean isManifest(Path file) { diff --git a/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java b/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java index 460853c9e..d8d74d1cb 100644 --- a/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java +++ b/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java @@ -1,12 +1,14 @@ package run.halo.app.infra; import java.io.IOException; -import java.util.concurrent.CountDownLatch; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.UrlResource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.stereotype.Component; import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; import run.halo.app.core.extension.theme.ThemeService; import run.halo.app.infra.properties.HaloProperties; import run.halo.app.infra.properties.ThemeProperties; @@ -41,20 +43,18 @@ public class DefaultThemeInitializer implements ApplicationListener latch.countDown()) - .subscribe(theme -> log.info("Initialized default theme: {}", - theme.getMetadata().getName())); - latch.await(); + var themeUrl = ResourceUtils.getURL(location); + var content = DataBufferUtils.read(new UrlResource(themeUrl), + DefaultDataBufferFactory.sharedInstance, + StreamUtils.BUFFER_SIZE); + var theme = themeService.install(content).block(); + log.info("Initialized default theme: {}", theme); // Because default active theme is default, we don't need to enabled it manually. - } catch (IOException | InterruptedException e) { + } catch (IOException e) { // we should skip the initialization error at here log.warn("Failed to initialize theme from " + location, e); } diff --git a/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java b/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java index ee03cf89b..dc2717867 100644 --- a/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java +++ b/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java @@ -8,35 +8,36 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; @Slf4j -public final class DataBufferUtils { +public enum DataBufferUtils { + ; - private DataBufferUtils() { + public static Mono toInputStream(Publisher content) { + return toInputStream(content, Schedulers.boundedElastic()); } - public static InputStream toInputStream(Flux content) throws IOException { - var pos = new PipedOutputStream(); - var pis = new PipedInputStream(pos); - write(content, pos) - .doOnComplete(() -> { - try { - pos.close(); - } catch (IOException ignored) { - // Ignore the error - } - }) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(releaseConsumer(), error -> { - if (error instanceof IOException) { - // Ignore the error - return; - } - log.error("Failed to write DataBuffer into OutputStream", error); - }); - return pis; + public static Mono toInputStream(Publisher content, + Scheduler scheduler) { + return Mono.create(sink -> { + try { + var pos = new PipedOutputStream(); + var pis = new PipedInputStream(pos); + var disposable = write(content, pos) + .subscribeOn(scheduler) + .subscribe(releaseConsumer(), sink::error, () -> FileUtils.closeQuietly(pos), + Context.of(sink.contextView())); + sink.onDispose(disposable); + sink.success(pis); + } catch (IOException e) { + sink.error(e); + } + }); } } diff --git a/application/src/main/java/run/halo/app/infra/utils/FileUtils.java b/application/src/main/java/run/halo/app/infra/utils/FileUtils.java index 44bedaf08..ca790bfde 100644 --- a/application/src/main/java/run/halo/app/infra/utils/FileUtils.java +++ b/application/src/main/java/run/halo/app/infra/utils/FileUtils.java @@ -2,6 +2,7 @@ package run.halo.app.infra.utils; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.springframework.util.FileSystemUtils.deleteRecursively; +import static run.halo.app.infra.utils.DataBufferUtils.toInputStream; import java.io.Closeable; import java.io.IOException; @@ -23,10 +24,13 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.lang.NonNull; import org.springframework.util.AntPathMatcher; import org.springframework.util.Assert; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import run.halo.app.infra.exception.AccessDeniedException; @@ -40,6 +44,26 @@ public abstract class FileUtils { private FileUtils() { } + public static Mono unzip(Publisher content, @NonNull Path targetPath) { + return unzip(content, targetPath, Schedulers.boundedElastic()); + } + + public static Mono unzip(Publisher content, @NonNull Path targetPath, + Scheduler scheduler) { + return Mono.usingWhen( + toInputStream(content, scheduler), + is -> { + try (var zis = new ZipInputStream(is)) { + unzip(zis, targetPath); + return Mono.empty(); + } catch (IOException e) { + return Mono.error(e); + } + }, + is -> Mono.fromRunnable(() -> closeQuietly(is)) + ); + } + public static void unzip(@NonNull ZipInputStream zis, @NonNull Path targetPath) throws IOException { // 1. unzip file to folder @@ -170,7 +194,7 @@ public abstract class FileUtils { * the given {@code consumer}. * * @param closeable The resource to close, may be null. - * @param consumer Consumes the IOException thrown by {@link Closeable#close()}. + * @param consumer Consumes the IOException thrown by {@link Closeable#close()}. */ public static void closeQuietly(final Closeable closeable, final Consumer consumer) { @@ -188,7 +212,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull Path parentPath, @@ -207,7 +231,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull String parentPath, @@ -218,7 +242,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull Path parentPath, @@ -237,15 +261,27 @@ public abstract class FileUtils { if (log.isDebugEnabled()) { log.debug("Delete {} result: {}", root, deleted); } - } catch (IOException e) { + } catch (IOException ignored) { // Ignore this error - if (log.isTraceEnabled()) { - log.trace("Failed to delete {} recursively", root); - } } } + public static Mono deleteRecursivelyAndSilently(Path root, Scheduler scheduler) { + return Mono.fromSupplier(() -> { + try { + return deleteRecursively(root); + } catch (IOException ignored) { + return false; + } + }).subscribeOn(scheduler); + } + + public static Mono deleteFileSilently(Path file) { + return deleteFileSilently(file, Schedulers.boundedElastic()); + } + + public static Mono deleteFileSilently(Path file, Scheduler scheduler) { return Mono.fromSupplier( () -> { if (file == null || !Files.isRegularFile(file)) { @@ -257,7 +293,7 @@ public abstract class FileUtils { return false; } }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(scheduler); } public static void copy(Path source, Path dest, CopyOption... options) { @@ -295,4 +331,7 @@ public abstract class FileUtils { }); } + public static Mono createTempDir(String prefix, Scheduler scheduler) { + return Mono.fromCallable(() -> Files.createTempDirectory(prefix)).subscribeOn(scheduler); + } } diff --git a/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java b/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java index 04965de4a..40d81fed6 100644 --- a/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java +++ b/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java @@ -1,38 +1,37 @@ package run.halo.app.migration.impl; -import static org.springframework.core.io.buffer.DataBufferUtils.releaseConsumer; -import static run.halo.app.infra.utils.FileUtils.closeQuietly; +import static java.nio.file.Files.deleteIfExists; +import static org.springframework.util.FileSystemUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.checkDirectoryTraversal; +import static run.halo.app.infra.utils.FileUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; +import static run.halo.app.infra.utils.FileUtils.unzip; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Set; -import java.util.zip.ZipInputStream; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.FileSystemUtils; import org.springframework.web.server.ServerWebInputException; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.context.Context; import run.halo.app.extension.store.ExtensionStore; import run.halo.app.extension.store.ExtensionStoreRepository; import run.halo.app.infra.exception.NotFoundException; @@ -67,6 +66,8 @@ public class MigrationServiceImpl implements MigrationService { private final DateTimeFormatter dateTimeFormatter; + private final Scheduler scheduler = Schedulers.boundedElastic(); + public MigrationServiceImpl(ExtensionStoreRepository repository, HaloProperties haloProperties) { this.repository = repository; @@ -94,55 +95,51 @@ public class MigrationServiceImpl implements MigrationService { @Override public Mono backup(Backup backup) { - try { - // create temporary folder to store all backup files into single files. - var tempDir = Files.createTempDirectory("halo-full-backup-"); - return backupExtensions(tempDir) - .and(backupWorkDir(tempDir)) - .and(packageBackup(tempDir, backup)) - .doFinally(signalType -> deleteRecursivelyAndSilently(tempDir)) - .subscribeOn(Schedulers.boundedElastic()); - } catch (IOException e) { - return Mono.error(e); - } + return Mono.usingWhen( + createTempDir("halo-full-backup-", scheduler), + tempDir -> backupExtensions(tempDir) + .then(Mono.defer(() -> backupWorkDir(tempDir))) + .then(Mono.defer(() -> packageBackup(tempDir, backup))), + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); } @Override public Mono download(Backup backup) { - var status = backup.getStatus(); - if (!Backup.Phase.SUCCEEDED.equals(status.getPhase()) || status.getFilename() == null) { - return Mono.error(new ServerWebInputException("Current backup is not downloadable.")); - } - - var backupFile = getBackupsRoot().resolve(status.getFilename()); - var resource = new FileSystemResource(backupFile); - if (!resource.exists()) { - return Mono.error(new NotFoundException("problemDetail.migration.backup.notFound", - new Object[] {}, "Backup file doesn't exist or deleted.")); - } - return Mono.just(resource); + return Mono.create(sink -> { + var status = backup.getStatus(); + if (!Backup.Phase.SUCCEEDED.equals(status.getPhase()) || status.getFilename() == null) { + sink.error(new ServerWebInputException("Current backup is not downloadable.")); + return; + } + var backupFile = getBackupsRoot().resolve(status.getFilename()); + var resource = new FileSystemResource(backupFile); + if (!resource.exists()) { + sink.error( + new NotFoundException("problemDetail.migration.backup.notFound", + new Object[] {}, + "Backup file doesn't exist or deleted.")); + return; + } + sink.success(resource); + }); } @Override @Transactional public Mono restore(Publisher content) { - return Mono.defer(() -> { - try { - var tempDir = Files.createTempDirectory("halo-restore-"); - return unpackBackup(content, tempDir) - .and(restoreExtensions(tempDir)) - .and(restoreWorkdir(tempDir)) - .doFinally(signalType -> deleteRecursivelyAndSilently(tempDir)) - .subscribeOn(Schedulers.boundedElastic()); - } catch (IOException e) { - return Mono.error(e); - } - }); + return Mono.usingWhen( + createTempDir("halo-restore-", scheduler), + tempDir -> unpackBackup(content, tempDir) + .then(Mono.defer(() -> restoreExtensions(tempDir))) + .then(Mono.defer(() -> restoreWorkdir(tempDir))), + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); } @Override public Mono cleanup(Backup backup) { - return Mono.fromRunnable(() -> { + return Mono.create(sink -> { var status = backup.getStatus(); if (status == null || status.getFilename() == null) { return; @@ -151,110 +148,104 @@ public class MigrationServiceImpl implements MigrationService { var backupsRoot = getBackupsRoot(); var backupFile = backupsRoot.resolve(filename); try { - FileUtils.checkDirectoryTraversal(backupsRoot, backupFile); - Files.deleteIfExists(backupFile); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }).subscribeOn(Schedulers.boundedElastic()); - } - - private Mono restoreWorkdir(Path backupRoot) { - return Mono.fromRunnable(() -> { - try { - var workdir = backupRoot.resolve("workdir"); - if (Files.exists(workdir)) { - FileSystemUtils.copyRecursively(workdir, haloProperties.getWorkDir()); - } - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } - - private Mono restoreExtensions(Path backupRoot) { - var extensionsPath = backupRoot.resolve("extensions.data"); - var reader = objectMapper.readerFor(ExtensionStore.class); - return Mono.>using( - () -> reader.readValues(extensionsPath.toFile()), - itr -> Flux.create( - sink -> { - while (itr.hasNext()) { - sink.next(itr.next()); - } - sink.complete(); - }) - // reset version - .doOnNext(extensionStore -> extensionStore.setVersion(null)) - .buffer(100) - // We might encounter OptimisticLockingFailureException when saving extension store, - // So we have to delete all extension stores before saving. - .flatMap(extensionStores -> repository.deleteAll(extensionStores) - .thenMany(repository.saveAll(extensionStores))) - .doOnNext(extensionStore -> - log.info("Restored extension store: {}", extensionStore.getName())) - .then(), - itr -> { - try { - itr.close(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } - - private Mono unpackBackup(Publisher content, Path target) { - return Mono.create(sink -> { - try (var pipedIs = new PipedInputStream(); - var pipedOs = new PipedOutputStream(pipedIs); - var zipIs = new ZipInputStream(pipedIs)) { - DataBufferUtils.write(content, pipedOs) - .subscribe( - releaseConsumer(), - sink::error, - () -> closeQuietly(pipedOs), - Context.of(sink.contextView())); - FileUtils.unzip(zipIs, target); + checkDirectoryTraversal(backupsRoot, backupFile); + deleteIfExists(backupFile); sink.success(); } catch (IOException e) { sink.error(e); } - }); + }).subscribeOn(scheduler); + } + + private Mono restoreWorkdir(Path backupRoot) { + return Mono.create(sink -> { + try { + var workdir = backupRoot.resolve("workdir"); + if (Files.exists(workdir)) { + copyRecursively(workdir, haloProperties.getWorkDir()); + } + sink.success(); + } catch (IOException e) { + sink.error(e); + } + }).subscribeOn(scheduler); + } + + private Mono restoreExtensions(Path backupRoot) { + var extensionsPath = backupRoot.resolve("extensions.data"); + if (Files.notExists(extensionsPath)) { + return Mono.empty(); + } + var reader = objectMapper.readerFor(ExtensionStore.class); + return Mono.>using( + () -> reader.readValues(extensionsPath.toFile()), + itr -> Flux.create( + sink -> { + while (itr.hasNext()) { + sink.next(itr.next()); + } + sink.complete(); + }) + // reset version + .doOnNext(extensionStore -> extensionStore.setVersion(null)).buffer(100) + // We might encounter OptimisticLockingFailureException when saving extension + // store, + // So we have to delete all extension stores before saving. + .flatMap(extensionStores -> repository.deleteAll(extensionStores) + .thenMany(repository.saveAll(extensionStores)) + ) + .doOnNext(extensionStore -> log.info("Restored extension store: {}", + extensionStore.getName())) + .then(), + FileUtils::closeQuietly) + .subscribeOn(scheduler); + } + + private Mono unpackBackup(Publisher content, Path target) { + return unzip(content, target, scheduler); } private Mono packageBackup(Path baseDir, Backup backup) { - return Mono.fromRunnable(() -> { - try { - var backupsFolder = getBackupsRoot(); - Files.createDirectories(backupsFolder); + return Mono.fromCallable( + () -> { + var backupsFolder = getBackupsRoot(); + Files.createDirectories(backupsFolder); + return backupsFolder; + }) + .handle((backupsFolder, sink) -> { var backupName = backup.getMetadata().getName(); var startTimestamp = backup.getStatus().getStartTimestamp(); var timePart = this.dateTimeFormatter.format(startTimestamp); var backupFile = backupsFolder.resolve(timePart + '-' + backupName + ".zip"); - FileUtils.zip(baseDir, backupFile); - backup.getStatus().setFilename(backupFile.getFileName().toString()); - backup.getStatus().setSize(Files.size(backupFile)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); + try { + FileUtils.zip(baseDir, backupFile); + backup.getStatus().setFilename(backupFile.getFileName().toString()); + backup.getStatus().setSize(Files.size(backupFile)); + sink.complete(); + } catch (IOException e) { + sink.error(e); + } + }) + .subscribeOn(scheduler); } private Mono backupWorkDir(Path baseDir) { - return Mono.fromRunnable(() -> { - try { - var workdirPath = Files.createDirectory(baseDir.resolve("workdir")); - FileUtils.copyRecursively(haloProperties.getWorkDir(), workdirPath, excludes); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); + return Mono.fromCallable(() -> Files.createDirectory(baseDir.resolve("workdir"))) + .handle((workdirPath, sink) -> { + try { + copyRecursively(haloProperties.getWorkDir(), workdirPath, excludes); + sink.complete(); + } catch (IOException e) { + sink.error(e); + } + }) + .subscribeOn(scheduler); } private Mono backupExtensions(Path baseDir) { - try { - var extensionsPath = Files.createFile(baseDir.resolve("extensions.data")); - return Mono.using(() -> objectMapper.writerFor(ExtensionStore.class) + return Mono.fromCallable(() -> Files.createFile(baseDir.resolve("extensions.data"))) + .flatMap(extensionsPath -> Mono.using( + () -> objectMapper.writerFor(ExtensionStore.class) .writeValuesAsArray(extensionsPath.toFile()), seqWriter -> repository.findAll() .doOnNext(extensionStore -> { @@ -263,17 +254,8 @@ public class MigrationServiceImpl implements MigrationService { } catch (IOException e) { throw Exceptions.propagate(e); } - }) - .then(), - seqWriter -> { - try { - seqWriter.close(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } catch (IOException e) { - return Mono.error(e); - } + }).then(), + FileUtils::closeQuietly)) + .subscribeOn(scheduler); } } diff --git a/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java b/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java index 7425fad48..4fc3194bb 100644 --- a/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java +++ b/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java @@ -4,7 +4,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -12,7 +11,6 @@ import static org.springframework.web.reactive.function.BodyInserters.fromMultip import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -24,15 +22,14 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.util.FileSystemUtils; import org.springframework.util.ResourceUtils; import org.springframework.web.server.ServerWebInputException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import run.halo.app.core.extension.Setting; import run.halo.app.core.extension.Theme; @@ -70,7 +67,7 @@ class ThemeEndpointTest { private SystemConfigurableEnvironmentFetcher environmentFetcher; @Mock - private ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private ReactiveUrlDataBufferFetcher urlDataBufferFetcher; @InjectMocks ThemeEndpoint themeEndpoint; @@ -105,7 +102,7 @@ class ThemeEndpointTest { bodyBuilder.part("file", new FileSystemResource(defaultTheme)) .contentType(MediaType.MULTIPART_FORM_DATA); - when(themeService.upgrade(eq("invalid-missing-manifest"), isA(InputStream.class))) + when(themeService.upgrade(eq("invalid-missing-manifest"), isA(Publisher.class))) .thenReturn( Mono.error(() -> new ServerWebInputException("Failed to upgrade theme"))); @@ -115,7 +112,7 @@ class ThemeEndpointTest { .exchange() .expectStatus().isBadRequest(); - verify(themeService).upgrade(eq("invalid-missing-manifest"), isA(InputStream.class)); + verify(themeService).upgrade(eq("invalid-missing-manifest"), isA(Publisher.class)); } @Test @@ -129,7 +126,7 @@ class ThemeEndpointTest { var newTheme = new Theme(); newTheme.setMetadata(metadata); - when(themeService.upgrade(eq("default"), isA(InputStream.class))) + when(themeService.upgrade(eq("default"), isA(Publisher.class))) .thenReturn(Mono.just(newTheme)); when(templateEngineManager.clearCache(eq("default"))) @@ -141,22 +138,20 @@ class ThemeEndpointTest { .exchange() .expectStatus().isOk(); - verify(themeService).upgrade(eq("default"), isA(InputStream.class)); + verify(themeService).upgrade(eq("default"), isA(Publisher.class)); verify(templateEngineManager, times(1)).clearCache(eq("default")); } @Test void upgradeFromUri() { - final URI uri = URI.create("https://example.com/test-theme.zip"); - Theme fakeTheme = mock(Theme.class); - Metadata metadata = new Metadata(); + var uri = URI.create("https://example.com/test-theme.zip"); + var metadata = new Metadata(); metadata.setName("default"); - when(fakeTheme.getMetadata()).thenReturn(metadata); - when(themeService.upgrade(eq("default"), isA(InputStream.class))) + var fakeTheme = new Theme(); + fakeTheme.setMetadata(metadata); + when(themeService.upgrade(eq("default"), any())) .thenReturn(Mono.just(fakeTheme)); - when(reactiveUrlDataBufferFetcher.fetch(eq(uri))) - .thenReturn(Flux.just(mock(DataBuffer.class))); when(templateEngineManager.clearCache(eq("default"))) .thenReturn(Mono.empty()); var body = new ThemeEndpoint.UpgradeFromUriRequest(uri); @@ -164,13 +159,12 @@ class ThemeEndpointTest { .uri("/themes/default/upgrade-from-uri") .bodyValue(body) .exchange() - .expectStatus().isOk(); + .expectStatus().isOk() + .expectBody(Theme.class).isEqualTo(fakeTheme); - verify(themeService).upgrade(eq("default"), isA(InputStream.class)); + verify(themeService).upgrade(eq("default"), any()); verify(templateEngineManager, times(1)).clearCache(eq("default")); - - verify(reactiveUrlDataBufferFetcher).fetch(eq(uri)); } } @@ -210,20 +204,21 @@ class ThemeEndpointTest { @Test void installFromUri() { final URI uri = URI.create("https://example.com/test-theme.zip"); - Theme fakeTheme = mock(Theme.class); - when(themeService.install(isA(InputStream.class))) - .thenReturn(Mono.just(fakeTheme)); - when(reactiveUrlDataBufferFetcher.fetch(eq(uri))) - .thenReturn(Flux.just(mock(DataBuffer.class))); + var metadata = new Metadata(); + metadata.setName("fake-theme"); + var theme = new Theme(); + theme.setMetadata(metadata); + + when(themeService.install(any())).thenReturn(Mono.just(theme)); var body = new ThemeEndpoint.UpgradeFromUriRequest(uri); webTestClient.post() .uri("/themes/-/install-from-uri") .bodyValue(body) .exchange() - .expectStatus().isOk(); + .expectStatus().isOk() + .expectBody(Theme.class).isEqualTo(theme); - verify(themeService).install(isA(InputStream.class)); - verify(reactiveUrlDataBufferFetcher).fetch(eq(uri)); + verify(themeService).install(any()); } @Test diff --git a/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java b/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java index eed3f5285..045b5f14e 100644 --- a/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java +++ b/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java @@ -32,7 +32,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import org.skyscreamer.jsonassert.JSONAssert; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; import org.springframework.web.server.ServerWebInputException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -112,6 +116,13 @@ class ThemeServiceImplTest { return Unstructured.OBJECT_MAPPER.convertValue(theme, Unstructured.class); } + Flux content(Path path) { + return DataBufferUtils.read( + path, + DefaultDataBufferFactory.sharedInstance, + StreamUtils.BUFFER_SIZE); + } + @Nested class UpgradeTest { @@ -119,10 +130,8 @@ class ThemeServiceImplTest { void shouldFailIfThemeNotInstalledBefore() throws IOException, URISyntaxException { var themeZipPath = prepareTheme("other"); when(client.fetch(Theme.class, "default")).thenReturn(Mono.empty()); - try (var is = Files.newInputStream(themeZipPath)) { - StepVerifier.create(themeService.upgrade("default", is)) - .verifyError(ServerWebInputException.class); - } + StepVerifier.create(themeService.upgrade("default", content(themeZipPath))) + .verifyError(ServerWebInputException.class); verify(client).fetch(Theme.class, "default"); } @@ -144,14 +153,12 @@ class ThemeServiceImplTest { when(client.create(isA(Unstructured.class))).thenReturn( Mono.just(convert(createTheme(t -> t.getSpec().setDisplayName("New fake theme"))))); - try (var is = Files.newInputStream(themeZipPath)) { - StepVerifier.create(themeService.upgrade("default", is)) - .consumeNextWith(newTheme -> { - assertEquals("default", newTheme.getMetadata().getName()); - assertEquals("New fake theme", newTheme.getSpec().getDisplayName()); - }) - .verifyComplete(); - } + StepVerifier.create(themeService.upgrade("default", content(themeZipPath))) + .consumeNextWith(newTheme -> { + assertEquals("default", newTheme.getMetadata().getName()); + assertEquals("New fake theme", newTheme.getSpec().getDisplayName()); + }) + .verifyComplete(); verify(client, times(3)).fetch(Theme.class, "default"); verify(client).delete(oldTheme); @@ -168,14 +175,12 @@ class ThemeServiceImplTest { var defaultThemeZipPath = prepareTheme("default"); when(client.create(isA(Unstructured.class))).thenReturn( Mono.just(convert(createTheme()))); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .consumeNextWith(theme -> { - assertEquals("default", theme.getMetadata().getName()); - assertEquals("Default", theme.getSpec().getDisplayName()); - }) - .verifyComplete(); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .consumeNextWith(theme -> { + assertEquals("default", theme.getMetadata().getName()); + assertEquals("Default", theme.getSpec().getDisplayName()); + }) + .verifyComplete(); } @Test @@ -183,19 +188,15 @@ class ThemeServiceImplTest { var defaultThemeZipPath = prepareTheme("default"); when(client.create(isA(Unstructured.class))).thenReturn( Mono.error(() -> new ExtensionException("Failed to create the extension"))); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .verifyError(ExtensionException.class); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .verifyError(ExtensionException.class); } @Test void shouldFailWhenThemeManifestIsInvalid() throws IOException, URISyntaxException { var defaultThemeZipPath = prepareTheme("invalid-missing-manifest"); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .verifyError(ThemeInstallationException.class); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .verifyError(ThemeInstallationException.class); } }