diff --git a/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java b/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java index 2426e8b81..430798aef 100644 --- a/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java +++ b/application/src/main/java/run/halo/app/core/extension/endpoint/PluginEndpoint.java @@ -9,16 +9,16 @@ import static org.springdoc.core.fn.builders.parameter.Builder.parameterBuilder; import static org.springdoc.core.fn.builders.requestbody.Builder.requestBodyBuilder; import static org.springdoc.core.fn.builders.schema.Builder.schemaBuilder; import static org.springframework.boot.convert.ApplicationConversionService.getSharedInstance; +import static org.springframework.core.io.buffer.DataBufferUtils.write; import static org.springframework.web.reactive.function.server.RequestPredicates.contentType; import static run.halo.app.extension.ListResult.generateGenericClass; import static run.halo.app.extension.router.QueryParamBuildUtil.buildParametersFromType; import static run.halo.app.extension.router.selector.SelectorUtil.labelAndFieldSelectorToPredicate; +import static run.halo.app.infra.utils.FileUtils.deleteFileSilently; import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Schema; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -33,7 +33,9 @@ import java.util.function.Function; import java.util.function.Predicate; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springdoc.webflux.core.fn.SpringdocRouteBuilder; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Sort; import org.springframework.http.MediaType; @@ -41,7 +43,6 @@ import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FormFieldPart; import org.springframework.http.codec.multipart.Part; import org.springframework.stereotype.Component; -import org.springframework.util.FileCopyUtils; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.server.RouterFunction; @@ -50,6 +51,7 @@ import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebInputException; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.Plugin; @@ -61,9 +63,6 @@ import run.halo.app.extension.ConfigMap; import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.router.IListRequest.QueryListRequest; import run.halo.app.infra.ReactiveUrlDataBufferFetcher; -import run.halo.app.infra.exception.ThemeInstallationException; -import run.halo.app.infra.exception.ThemeUpgradeException; -import run.halo.app.infra.utils.DataBufferUtils; import run.halo.app.plugin.PluginNotFoundException; @Slf4j @@ -77,6 +76,8 @@ public class PluginEndpoint implements CustomEndpoint { private final ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private final Scheduler scheduler = Schedulers.boundedElastic(); + @Override public RouterFunction endpoint() { final var tag = "api.console.halo.run/v1alpha1/Plugin"; @@ -221,48 +222,29 @@ public class PluginEndpoint implements CustomEndpoint { } private Mono upgradeFromUri(ServerRequest request) { - final var name = request.pathVariable("name"); - return request.bodyToMono(UpgradeFromUriRequest.class) - .flatMap(upgradeRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(upgradeRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .onErrorMap(throwable -> { - log.error("Failed to fetch plugin file from uri.", throwable); - return new ThemeUpgradeException("Failed to fetch plugin file from uri.", null, - null); - }) - .flatMap(inputStream -> Mono.usingWhen( - transferToTemp(inputStream), - (path) -> pluginService.upgrade(name, path), + var name = request.pathVariable("name"); + var content = request.bodyToMono(UpgradeFromUriRequest.class) + .map(UpgradeFromUriRequest::uri) + .flatMapMany(reactiveUrlDataBufferFetcher::fetch); + + return Mono.usingWhen( + writeToTempFile(content), + path -> pluginService.upgrade(name, path), this::deleteFileIfExists) - ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(upgradedPlugin -> ServerResponse.ok().bodyValue(upgradedPlugin)); } private Mono installFromUri(ServerRequest request) { - return request.bodyToMono(InstallFromUriRequest.class) - .flatMap(installRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(installRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch plugin file from uri.", throwable); - throw new ThemeInstallationException("Failed to fetch plugin file from uri.", null, - null); - }) - .flatMap(inputStream -> Mono.usingWhen( - transferToTemp(inputStream), + var content = request.bodyToMono(InstallFromUriRequest.class) + .map(InstallFromUriRequest::uri) + .flatMapMany(reactiveUrlDataBufferFetcher::fetch); + + return Mono.usingWhen( + writeToTempFile(content), pluginService::install, - this::deleteFileIfExists) + this::deleteFileIfExists ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(newPlugin -> ServerResponse.ok().bodyValue(newPlugin)); } public record InstallFromUriRequest(@Schema(requiredMode = REQUIRED) URI uri) { @@ -402,10 +384,12 @@ public class PluginEndpoint implements CustomEndpoint { .bodyValue(upgradedPlugin)); } - private Mono installFromFile(Mono filePartMono, + private Mono installFromFile(FilePart filePart, Function> resourceClosure) { - var pathMono = filePartMono.flatMap(this::transferToTemp); - return Mono.usingWhen(pathMono, resourceClosure, this::deleteFileIfExists); + return Mono.usingWhen( + writeToTempFile(filePart.content()), + resourceClosure, + this::deleteFileIfExists); } private Mono installFromPreset(Mono presetNameMono, @@ -529,19 +513,18 @@ public class PluginEndpoint implements CustomEndpoint { } @Schema(requiredMode = NOT_REQUIRED, description = "Plugin Jar file.") - public Mono getFile() { + public FilePart getFile() { var part = multipartData.getFirst("file"); if (part == null) { - return Mono.error(new ServerWebInputException("Form field file is required")); + throw new ServerWebInputException("Form field file is required"); } if (!(part instanceof FilePart file)) { - return Mono.error(new ServerWebInputException("Invalid parameter of file")); + throw new ServerWebInputException("Invalid parameter of file"); } if (!Paths.get(file.filename()).toString().endsWith(".jar")) { - return Mono.error( - new ServerWebInputException("Invalid file type, only jar is supported")); + throw new ServerWebInputException("Invalid file type, only jar is supported"); } - return Mono.just(file); + return file; } @Schema(requiredMode = NOT_REQUIRED, @@ -584,29 +567,13 @@ public class PluginEndpoint implements CustomEndpoint { } Mono deleteFileIfExists(Path path) { - return Mono.fromRunnable(() -> { - try { - Files.deleteIfExists(path); - } catch (IOException e) { - // ignore this error - log.warn("Failed to delete temporary jar file: {}", path, e); - } - }).subscribeOn(Schedulers.boundedElastic()).then(); + return deleteFileSilently(path, this.scheduler).then(); } - private Mono transferToTemp(FilePart filePart) { - return Mono.fromCallable(() -> Files.createTempFile("halo-plugins", ".jar")) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(path -> filePart.transferTo(path) - .thenReturn(path) - ); + private Mono writeToTempFile(Publisher content) { + return Mono.fromCallable(() -> Files.createTempFile("halo-plugin-", ".jar")) + .flatMap(path -> write(content, path).thenReturn(path)) + .subscribeOn(this.scheduler); } - private Mono transferToTemp(InputStream inputStream) { - return Mono.fromCallable(() -> { - Path tempFile = Files.createTempFile("halo-plugins", ".jar"); - FileCopyUtils.copy(inputStream, Files.newOutputStream(tempFile)); - return tempFile; - }).subscribeOn(Schedulers.boundedElastic()); - } } diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java index 0ce93cb14..233abad3e 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeEndpoint.java @@ -7,11 +7,9 @@ import static org.springdoc.core.fn.builders.parameter.Builder.parameterBuilder; import static org.springdoc.core.fn.builders.requestbody.Builder.requestBodyBuilder; import static org.springdoc.core.fn.builders.schema.Builder.schemaBuilder; import static org.springframework.web.reactive.function.server.RequestPredicates.contentType; -import static run.halo.app.infra.utils.DataBufferUtils.toInputStream; import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.media.Schema; -import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -29,14 +27,11 @@ import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; -import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.ServerWebInputException; -import reactor.core.Exceptions; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.Setting; import run.halo.app.core.extension.Theme; @@ -51,9 +46,6 @@ import run.halo.app.infra.SystemConfigurableEnvironmentFetcher; import run.halo.app.infra.SystemSetting; import run.halo.app.infra.ThemeRootGetter; import run.halo.app.infra.exception.NotFoundException; -import run.halo.app.infra.exception.ThemeInstallationException; -import run.halo.app.infra.exception.ThemeUpgradeException; -import run.halo.app.infra.utils.DataBufferUtils; import run.halo.app.infra.utils.JsonUtils; import run.halo.app.theme.TemplateEngineManager; @@ -78,7 +70,7 @@ public class ThemeEndpoint implements CustomEndpoint { private final SystemConfigurableEnvironmentFetcher systemEnvironmentFetcher; - private final ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private final ReactiveUrlDataBufferFetcher urlDataBufferFetcher; @Override public RouterFunction endpoint() { @@ -244,43 +236,25 @@ public class ThemeEndpoint implements CustomEndpoint { private Mono upgradeFromUri(ServerRequest request) { final var name = request.pathVariable("name"); - return request.bodyToMono(UpgradeFromUriRequest.class) - .flatMap(upgradeRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(upgradeRequest.uri()))) + var content = request.bodyToMono(UpgradeFromUriRequest.class) + .map(UpgradeFromUriRequest::uri) + .flatMapMany(urlDataBufferFetcher::fetch); + + return themeService.upgrade(name, content) + .flatMap((updatedTheme) -> + templateEngineManager.clearCache(updatedTheme.getMetadata().getName()) + .thenReturn(updatedTheme) ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch zip file from uri.", throwable); - throw new ThemeUpgradeException("Failed to fetch zip file from uri.", null, - null); - }) - .flatMap(inputStream -> themeService.upgrade(name, inputStream)) - .flatMap((updatedTheme) -> templateEngineManager.clearCache( - updatedTheme.getMetadata().getName()) - .thenReturn(updatedTheme) - ) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } private Mono installFromUri(ServerRequest request) { - return request.bodyToMono(InstallFromUriRequest.class) - .flatMap(installRequest -> Mono.fromCallable(() -> DataBufferUtils.toInputStream( - reactiveUrlDataBufferFetcher.fetch(installRequest.uri()))) - ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(throwable -> { - log.error("Failed to fetch zip file from uri.", throwable); - throw new ThemeInstallationException("Failed to fetch zip file from uri.", null, - null); - }) - .flatMap(themeService::install) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme) - ); + var content = request.bodyToMono(InstallFromUriRequest.class) + .map(InstallFromUriRequest::uri) + .flatMapMany(urlDataBufferFetcher::fetch); + + return themeService.install(content) + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } private Mono activateTheme(ServerRequest request) { @@ -328,7 +302,7 @@ public class ThemeEndpoint implements CustomEndpoint { if (!configMapName.equals(configMapNameToUpdate)) { throw new ServerWebInputException( "The name from the request body does not match the theme " - + "configMapName name."); + + "configMapName name."); } }) .flatMap(configMapToUpdate -> client.fetch(ConfigMap.class, configMapName) @@ -442,22 +416,15 @@ public class ThemeEndpoint implements CustomEndpoint { private Mono upgrade(ServerRequest request) { // validate the theme first - var themeNameInPath = request.pathVariable("name"); + var name = request.pathVariable("name"); return request.multipartData() .map(UpgradeRequest::new) .map(UpgradeRequest::getFile) - .flatMap(file -> { - try { - return themeService.upgrade(themeNameInPath, toInputStream(file.content())); - } catch (IOException e) { - return Mono.error(e); - } - }) - .flatMap((updatedTheme) -> templateEngineManager.clearCache( - updatedTheme.getMetadata().getName()) - .thenReturn(updatedTheme)) - .flatMap(updatedTheme -> ServerResponse.ok() - .bodyValue(updatedTheme)); + .flatMap(filePart -> themeService.upgrade(name, filePart.content())) + .flatMap((updatedTheme) -> + templateEngineManager.clearCache(updatedTheme.getMetadata().getName()) + .thenReturn(updatedTheme)) + .flatMap(updatedTheme -> ServerResponse.ok().bodyValue(updatedTheme)); } Mono> listUninstalled(ThemeQuery query) { @@ -500,39 +467,39 @@ public class ThemeEndpoint implements CustomEndpoint { } @Schema(name = "ThemeInstallRequest") - public record InstallRequest( - @Schema(requiredMode = REQUIRED, description = "Theme zip file.") FilePart file) { + public static class InstallRequest { + + @Schema(hidden = true) + private final MultiValueMap multipartData; + + public InstallRequest(MultiValueMap multipartData) { + this.multipartData = multipartData; + } + + @Schema(requiredMode = REQUIRED, description = "Theme zip file.") + FilePart getFile() { + Part part = multipartData.getFirst("file"); + if (!(part instanceof FilePart file)) { + throw new ServerWebInputException( + "Invalid parameter of file, binary data is required"); + } + if (!Paths.get(file.filename()).toString().endsWith(".zip")) { + throw new ServerWebInputException( + "Invalid file type, only zip format is supported"); + } + return file; + } } public record InstallFromUriRequest(@Schema(requiredMode = REQUIRED) URI uri) { } Mono install(ServerRequest request) { - return request.body(BodyExtractors.toMultipartData()) - .flatMap(this::getZipFilePart) - .flatMap(file -> { - try { - return themeService.install(toInputStream(file.content())); - } catch (IOException e) { - return Mono.error(Exceptions.propagate(e)); - } - }) - .flatMap(theme -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(theme)); - } - - Mono getZipFilePart(MultiValueMap formData) { - Part part = formData.getFirst("file"); - if (!(part instanceof FilePart file)) { - return Mono.error(new ServerWebInputException( - "Invalid parameter of file, binary data is required")); - } - if (!Paths.get(file.filename()).toString().endsWith(".zip")) { - return Mono.error(new ServerWebInputException( - "Invalid file type, only zip format is supported")); - } - return Mono.just(file); + return request.multipartData() + .map(InstallRequest::new) + .map(InstallRequest::getFile) + .flatMap(filePart -> themeService.install(filePart.content())) + .flatMap(theme -> ServerResponse.ok().bodyValue(theme)); } } diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java index 3bfe7e2e3..a3954a3fc 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeService.java @@ -1,15 +1,16 @@ package run.halo.app.core.extension.theme; -import java.io.InputStream; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import reactor.core.publisher.Mono; import run.halo.app.core.extension.Theme; import run.halo.app.extension.ConfigMap; public interface ThemeService { - Mono install(InputStream is); + Mono install(Publisher content); - Mono upgrade(String themeName, InputStream is); + Mono upgrade(String themeName, Publisher content); Mono reloadTheme(String name); diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java index 44c9f1bf6..df519bd8a 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeServiceImpl.java @@ -1,34 +1,33 @@ package run.halo.app.core.extension.theme; -import static java.nio.file.Files.createTempDirectory; import static org.springframework.util.FileSystemUtils.copyRecursively; import static run.halo.app.core.extension.theme.ThemeUtils.loadThemeManifest; import static run.halo.app.core.extension.theme.ThemeUtils.locateThemeManifest; +import static run.halo.app.core.extension.theme.ThemeUtils.unzipThemeTo; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; import static run.halo.app.infra.utils.FileUtils.unzip; -import java.io.IOException; -import java.io.InputStream; import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import java.util.zip.ZipInputStream; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.retry.RetryException; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.springframework.web.server.ServerErrorException; import org.springframework.web.server.ServerWebInputException; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import run.halo.app.core.extension.AnnotationSetting; @@ -55,76 +54,60 @@ public class ThemeServiceImpl implements ThemeService { private final SystemVersionSupplier systemVersionSupplier; + private final Scheduler scheduler = Schedulers.boundedElastic(); + @Override - public Mono install(InputStream is) { + public Mono install(Publisher content) { var themeRoot = this.themeRoot.get(); - return ThemeUtils.unzipThemeTo(is, themeRoot) + return unzipThemeTo(content, themeRoot, scheduler) .flatMap(this::persistent); } @Override - public Mono upgrade(String themeName, InputStream is) { - var tempDir = new AtomicReference(); - var tempThemeRoot = new AtomicReference(); - return client.fetch(Theme.class, themeName) + public Mono upgrade(String themeName, Publisher content) { + var checkTheme = client.fetch(Theme.class, themeName) .switchIfEmpty(Mono.error(() -> new ServerWebInputException( - "The given theme with name " + themeName + " did not exist"))) - .publishOn(Schedulers.boundedElastic()) - .doFirst(() -> { - try { - tempDir.set(createTempDirectory("halo-theme-")); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(oldTheme -> { - try (var zis = new ZipInputStream(is)) { - unzip(zis, tempDir.get()); - return locateThemeManifest(tempDir.get()).switchIfEmpty(Mono.error( - () -> new ThemeUpgradeException( - "Missing theme manifest file: theme.yaml or theme.yml", - "problemDetail.theme.upgrade.missingManifest", null))); - } catch (IOException e) { - return Mono.error(e); - } - }) - .doOnNext(themeManifest -> { - if (log.isDebugEnabled()) { - log.debug("Found theme manifest file: {}", themeManifest); - } - tempThemeRoot.set(themeManifest.getParent()); - }) - .map(ThemeUtils::loadThemeManifest) - .doOnNext(newTheme -> { - if (!Objects.equals(themeName, newTheme.getMetadata().getName())) { - if (log.isDebugEnabled()) { - log.error("Want theme name: {}, but provided: {}", themeName, - newTheme.getMetadata().getName()); - } - throw new ThemeUpgradeException("Please make sure the theme name is correct", - "problemDetail.theme.upgrade.nameMismatch", - new Object[] {newTheme.getMetadata().getName(), themeName}); - } - }) - .flatMap(newTheme -> { - // Remove the theme before upgrading - return deleteThemeAndWaitForComplete(newTheme.getMetadata().getName()) - .thenReturn(newTheme); - }) - .doOnNext(newTheme -> { - // prepare the theme - var themePath = themeRoot.get().resolve(newTheme.getMetadata().getName()); - try { - copyRecursively(tempThemeRoot.get(), themePath); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(this::persistent) - .doFinally(signalType -> { - // clear the temporary folder - deleteRecursivelyAndSilently(tempDir.get()); - }); + "The given theme with name " + themeName + " did not exist"))); + var upgradeTheme = Mono.usingWhen( + createTempDir("halo-theme-", scheduler), + tempDir -> { + var locateThemeManifest = Mono.fromCallable(() -> locateThemeManifest(tempDir) + .orElseThrow(() -> new ThemeUpgradeException( + "Missing theme manifest file: theme.yaml or theme.yml", + "problemDetail.theme.upgrade.missingManifest", null))); + return unzip(content, tempDir, scheduler) + .then(locateThemeManifest) + .flatMap(themeManifest -> { + if (log.isDebugEnabled()) { + log.debug("Found theme manifest file: {}", themeManifest); + } + var newTheme = loadThemeManifest(themeManifest); + if (!Objects.equals(themeName, newTheme.getMetadata().getName())) { + if (log.isDebugEnabled()) { + log.error("Want theme name: {}, but provided: {}", themeName, + newTheme.getMetadata().getName()); + } + return Mono.error(new ThemeUpgradeException( + "Please make sure the theme name is correct", + "problemDetail.theme.upgrade.nameMismatch", + new Object[] {newTheme.getMetadata().getName(), themeName})); + } + + var copyTheme = Mono.fromCallable(() -> { + var themePath = themeRoot.get().resolve(themeName); + copyRecursively(themeManifest.getParent(), themePath); + return themePath; + }); + + return deleteThemeAndWaitForComplete(themeName) + .then(copyTheme) + .then(this.persistent(newTheme)); + }); + }, + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); + + return checkTheme.then(upgradeTheme); } /** diff --git a/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java b/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java index 6de1b3f03..a0276a36e 100644 --- a/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java +++ b/application/src/main/java/run/halo/app/core/extension/theme/ThemeUtils.java @@ -1,12 +1,11 @@ package run.halo.app.core.extension.theme; -import static java.nio.file.Files.createTempDirectory; import static org.springframework.util.FileSystemUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; import static run.halo.app.infra.utils.FileUtils.unzip; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -15,13 +14,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.BaseStream; import java.util.stream.Stream; -import java.util.zip.ZipInputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; import org.springframework.web.server.ResponseStatusException; @@ -29,6 +28,7 @@ import org.springframework.web.server.ServerWebInputException; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import run.halo.app.core.extension.Theme; import run.halo.app.extension.Unstructured; @@ -101,64 +101,51 @@ class ThemeUtils { } } - static Mono unzipThemeTo(InputStream inputStream, Path themeWorkDir) { - return unzipThemeTo(inputStream, themeWorkDir, false) + static Mono unzipThemeTo(Publisher content, Path themeWorkDir, + Scheduler scheduler) { + return unzipThemeTo(content, themeWorkDir, false, scheduler) .onErrorMap(e -> !(e instanceof ResponseStatusException), e -> { log.error("Failed to unzip theme", e); throw new ServerWebInputException("Failed to unzip theme"); }); } - static Mono unzipThemeTo(InputStream inputStream, Path themeWorkDir, - boolean override) { - var tempDir = new AtomicReference(); - return Mono.just(inputStream) - .publishOn(Schedulers.boundedElastic()) - .doFirst(() -> { - try { - tempDir.set(createTempDirectory(THEME_TMP_PREFIX)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .doOnNext(is -> { - try (var zipIs = new ZipInputStream(is)) { - // unzip input stream into temporary directory - unzip(zipIs, tempDir.get()); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }) - .flatMap(is -> ThemeUtils.locateThemeManifest(tempDir.get())) - .switchIfEmpty( - Mono.error(() -> new ThemeInstallationException("Missing theme manifest", - "problemDetail.theme.install.missingManifest", null))) - .map(themeManifestPath -> { - var theme = loadThemeManifest(themeManifestPath); - var themeName = theme.getMetadata().getName(); - var themeTargetPath = themeWorkDir.resolve(themeName); - try { - if (!override && !FileUtils.isEmpty(themeTargetPath)) { - throw new ThemeAlreadyExistsException(themeName); - } - // install theme to theme work dir - copyRecursively(themeManifestPath.getParent(), themeTargetPath); - return theme; - } catch (IOException e) { - deleteRecursivelyAndSilently(themeTargetPath); - throw Exceptions.propagate(e); - } - }) - .doFinally(signalType -> { - FileUtils.closeQuietly(inputStream); - deleteRecursivelyAndSilently(tempDir.get()); - }); + static Mono unzipThemeTo(Publisher content, Path themeWorkDir, + boolean override, Scheduler scheduler) { + return Mono.usingWhen( + createTempDir(THEME_TMP_PREFIX, scheduler), + tempDir -> { + var locateThemeManifest = Mono.fromCallable(() -> locateThemeManifest(tempDir) + .orElseThrow(() -> new ThemeInstallationException("Missing theme manifest", + "problemDetail.theme.install.missingManifest", null))); + return unzip(content, tempDir, scheduler) + .then(locateThemeManifest) + .handle((themeManifestPath, sink) -> { + var theme = loadThemeManifest(themeManifestPath); + var themeName = theme.getMetadata().getName(); + var themeTargetPath = themeWorkDir.resolve(themeName); + try { + if (!override && !FileUtils.isEmpty(themeTargetPath)) { + sink.error(new ThemeAlreadyExistsException(themeName)); + return; + } + // install theme to theme work dir + copyRecursively(themeManifestPath.getParent(), themeTargetPath); + sink.next(theme); + } catch (IOException e) { + deleteRecursivelyAndSilently(themeTargetPath); + sink.error(e); + } + }) + .subscribeOn(scheduler); + }, + tempDir -> FileUtils.deleteRecursivelyAndSilently(tempDir, scheduler) + ); } static Unstructured loadThemeManifest(Path themeManifestPath) { - List unstructureds = - new YamlUnstructuredLoader(new FileSystemResource(themeManifestPath)) - .load(); + var unstructureds = new YamlUnstructuredLoader(new FileSystemResource(themeManifestPath)) + .load(); if (CollectionUtils.isEmpty(unstructureds)) { throw new ThemeInstallationException("Missing theme manifest", "problemDetail.theme.install.missingManifest", null); @@ -177,37 +164,35 @@ class ThemeUtils { return null; } - static Mono locateThemeManifest(Path dir) { - return Mono.justOrEmpty(dir) - .filter(Files::isDirectory) - .publishOn(Schedulers.boundedElastic()) - .mapNotNull(path -> { - var queue = new LinkedList(); - queue.add(dir); - var manifest = Optional.empty(); - while (!queue.isEmpty()) { - var current = queue.pop(); - try (Stream subPaths = Files.list(current)) { - manifest = subPaths.filter(Files::isReadable) - .filter(subPath -> { - if (Files.isDirectory(subPath)) { - queue.add(subPath); - return false; - } - return true; - }) - .filter(Files::isRegularFile) - .filter(ThemeUtils::isManifest) - .findFirst(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - if (manifest.isPresent()) { - break; - } - } - return manifest.orElse(null); - }); + static Optional locateThemeManifest(Path path) { + if (!Files.isDirectory(path)) { + return Optional.empty(); + } + var queue = new LinkedList(); + queue.add(path); + var manifest = Optional.empty(); + while (!queue.isEmpty()) { + var current = queue.pop(); + try (Stream subPaths = Files.list(current)) { + manifest = subPaths.filter(Files::isReadable) + .filter(subPath -> { + if (Files.isDirectory(subPath)) { + queue.add(subPath); + return false; + } + return true; + }) + .filter(Files::isRegularFile) + .filter(ThemeUtils::isManifest) + .findFirst(); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + if (manifest.isPresent()) { + break; + } + } + return manifest; } static boolean isManifest(Path file) { diff --git a/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java b/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java index 460853c9e..d8d74d1cb 100644 --- a/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java +++ b/application/src/main/java/run/halo/app/infra/DefaultThemeInitializer.java @@ -1,12 +1,14 @@ package run.halo.app.infra; import java.io.IOException; -import java.util.concurrent.CountDownLatch; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.UrlResource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.stereotype.Component; import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; import run.halo.app.core.extension.theme.ThemeService; import run.halo.app.infra.properties.HaloProperties; import run.halo.app.infra.properties.ThemeProperties; @@ -41,20 +43,18 @@ public class DefaultThemeInitializer implements ApplicationListener latch.countDown()) - .subscribe(theme -> log.info("Initialized default theme: {}", - theme.getMetadata().getName())); - latch.await(); + var themeUrl = ResourceUtils.getURL(location); + var content = DataBufferUtils.read(new UrlResource(themeUrl), + DefaultDataBufferFactory.sharedInstance, + StreamUtils.BUFFER_SIZE); + var theme = themeService.install(content).block(); + log.info("Initialized default theme: {}", theme); // Because default active theme is default, we don't need to enabled it manually. - } catch (IOException | InterruptedException e) { + } catch (IOException e) { // we should skip the initialization error at here log.warn("Failed to initialize theme from " + location, e); } diff --git a/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java b/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java index ee03cf89b..dc2717867 100644 --- a/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java +++ b/application/src/main/java/run/halo/app/infra/utils/DataBufferUtils.java @@ -8,35 +8,36 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; @Slf4j -public final class DataBufferUtils { +public enum DataBufferUtils { + ; - private DataBufferUtils() { + public static Mono toInputStream(Publisher content) { + return toInputStream(content, Schedulers.boundedElastic()); } - public static InputStream toInputStream(Flux content) throws IOException { - var pos = new PipedOutputStream(); - var pis = new PipedInputStream(pos); - write(content, pos) - .doOnComplete(() -> { - try { - pos.close(); - } catch (IOException ignored) { - // Ignore the error - } - }) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(releaseConsumer(), error -> { - if (error instanceof IOException) { - // Ignore the error - return; - } - log.error("Failed to write DataBuffer into OutputStream", error); - }); - return pis; + public static Mono toInputStream(Publisher content, + Scheduler scheduler) { + return Mono.create(sink -> { + try { + var pos = new PipedOutputStream(); + var pis = new PipedInputStream(pos); + var disposable = write(content, pos) + .subscribeOn(scheduler) + .subscribe(releaseConsumer(), sink::error, () -> FileUtils.closeQuietly(pos), + Context.of(sink.contextView())); + sink.onDispose(disposable); + sink.success(pis); + } catch (IOException e) { + sink.error(e); + } + }); } } diff --git a/application/src/main/java/run/halo/app/infra/utils/FileUtils.java b/application/src/main/java/run/halo/app/infra/utils/FileUtils.java index 44bedaf08..ca790bfde 100644 --- a/application/src/main/java/run/halo/app/infra/utils/FileUtils.java +++ b/application/src/main/java/run/halo/app/infra/utils/FileUtils.java @@ -2,6 +2,7 @@ package run.halo.app.infra.utils; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.springframework.util.FileSystemUtils.deleteRecursively; +import static run.halo.app.infra.utils.DataBufferUtils.toInputStream; import java.io.Closeable; import java.io.IOException; @@ -23,10 +24,13 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.lang.NonNull; import org.springframework.util.AntPathMatcher; import org.springframework.util.Assert; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import run.halo.app.infra.exception.AccessDeniedException; @@ -40,6 +44,26 @@ public abstract class FileUtils { private FileUtils() { } + public static Mono unzip(Publisher content, @NonNull Path targetPath) { + return unzip(content, targetPath, Schedulers.boundedElastic()); + } + + public static Mono unzip(Publisher content, @NonNull Path targetPath, + Scheduler scheduler) { + return Mono.usingWhen( + toInputStream(content, scheduler), + is -> { + try (var zis = new ZipInputStream(is)) { + unzip(zis, targetPath); + return Mono.empty(); + } catch (IOException e) { + return Mono.error(e); + } + }, + is -> Mono.fromRunnable(() -> closeQuietly(is)) + ); + } + public static void unzip(@NonNull ZipInputStream zis, @NonNull Path targetPath) throws IOException { // 1. unzip file to folder @@ -170,7 +194,7 @@ public abstract class FileUtils { * the given {@code consumer}. * * @param closeable The resource to close, may be null. - * @param consumer Consumes the IOException thrown by {@link Closeable#close()}. + * @param consumer Consumes the IOException thrown by {@link Closeable#close()}. */ public static void closeQuietly(final Closeable closeable, final Consumer consumer) { @@ -188,7 +212,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull Path parentPath, @@ -207,7 +231,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull String parentPath, @@ -218,7 +242,7 @@ public abstract class FileUtils { /** * Checks directory traversal vulnerability. * - * @param parentPath parent path must not be null. + * @param parentPath parent path must not be null. * @param pathToCheck path to check must not be null */ public static void checkDirectoryTraversal(@NonNull Path parentPath, @@ -237,15 +261,27 @@ public abstract class FileUtils { if (log.isDebugEnabled()) { log.debug("Delete {} result: {}", root, deleted); } - } catch (IOException e) { + } catch (IOException ignored) { // Ignore this error - if (log.isTraceEnabled()) { - log.trace("Failed to delete {} recursively", root); - } } } + public static Mono deleteRecursivelyAndSilently(Path root, Scheduler scheduler) { + return Mono.fromSupplier(() -> { + try { + return deleteRecursively(root); + } catch (IOException ignored) { + return false; + } + }).subscribeOn(scheduler); + } + + public static Mono deleteFileSilently(Path file) { + return deleteFileSilently(file, Schedulers.boundedElastic()); + } + + public static Mono deleteFileSilently(Path file, Scheduler scheduler) { return Mono.fromSupplier( () -> { if (file == null || !Files.isRegularFile(file)) { @@ -257,7 +293,7 @@ public abstract class FileUtils { return false; } }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(scheduler); } public static void copy(Path source, Path dest, CopyOption... options) { @@ -295,4 +331,7 @@ public abstract class FileUtils { }); } + public static Mono createTempDir(String prefix, Scheduler scheduler) { + return Mono.fromCallable(() -> Files.createTempDirectory(prefix)).subscribeOn(scheduler); + } } diff --git a/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java b/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java index 04965de4a..40d81fed6 100644 --- a/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java +++ b/application/src/main/java/run/halo/app/migration/impl/MigrationServiceImpl.java @@ -1,38 +1,37 @@ package run.halo.app.migration.impl; -import static org.springframework.core.io.buffer.DataBufferUtils.releaseConsumer; -import static run.halo.app.infra.utils.FileUtils.closeQuietly; +import static java.nio.file.Files.deleteIfExists; +import static org.springframework.util.FileSystemUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.checkDirectoryTraversal; +import static run.halo.app.infra.utils.FileUtils.copyRecursively; +import static run.halo.app.infra.utils.FileUtils.createTempDir; import static run.halo.app.infra.utils.FileUtils.deleteRecursivelyAndSilently; +import static run.halo.app.infra.utils.FileUtils.unzip; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Set; -import java.util.zip.ZipInputStream; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.FileSystemUtils; import org.springframework.web.server.ServerWebInputException; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.context.Context; import run.halo.app.extension.store.ExtensionStore; import run.halo.app.extension.store.ExtensionStoreRepository; import run.halo.app.infra.exception.NotFoundException; @@ -67,6 +66,8 @@ public class MigrationServiceImpl implements MigrationService { private final DateTimeFormatter dateTimeFormatter; + private final Scheduler scheduler = Schedulers.boundedElastic(); + public MigrationServiceImpl(ExtensionStoreRepository repository, HaloProperties haloProperties) { this.repository = repository; @@ -94,55 +95,51 @@ public class MigrationServiceImpl implements MigrationService { @Override public Mono backup(Backup backup) { - try { - // create temporary folder to store all backup files into single files. - var tempDir = Files.createTempDirectory("halo-full-backup-"); - return backupExtensions(tempDir) - .and(backupWorkDir(tempDir)) - .and(packageBackup(tempDir, backup)) - .doFinally(signalType -> deleteRecursivelyAndSilently(tempDir)) - .subscribeOn(Schedulers.boundedElastic()); - } catch (IOException e) { - return Mono.error(e); - } + return Mono.usingWhen( + createTempDir("halo-full-backup-", scheduler), + tempDir -> backupExtensions(tempDir) + .then(Mono.defer(() -> backupWorkDir(tempDir))) + .then(Mono.defer(() -> packageBackup(tempDir, backup))), + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); } @Override public Mono download(Backup backup) { - var status = backup.getStatus(); - if (!Backup.Phase.SUCCEEDED.equals(status.getPhase()) || status.getFilename() == null) { - return Mono.error(new ServerWebInputException("Current backup is not downloadable.")); - } - - var backupFile = getBackupsRoot().resolve(status.getFilename()); - var resource = new FileSystemResource(backupFile); - if (!resource.exists()) { - return Mono.error(new NotFoundException("problemDetail.migration.backup.notFound", - new Object[] {}, "Backup file doesn't exist or deleted.")); - } - return Mono.just(resource); + return Mono.create(sink -> { + var status = backup.getStatus(); + if (!Backup.Phase.SUCCEEDED.equals(status.getPhase()) || status.getFilename() == null) { + sink.error(new ServerWebInputException("Current backup is not downloadable.")); + return; + } + var backupFile = getBackupsRoot().resolve(status.getFilename()); + var resource = new FileSystemResource(backupFile); + if (!resource.exists()) { + sink.error( + new NotFoundException("problemDetail.migration.backup.notFound", + new Object[] {}, + "Backup file doesn't exist or deleted.")); + return; + } + sink.success(resource); + }); } @Override @Transactional public Mono restore(Publisher content) { - return Mono.defer(() -> { - try { - var tempDir = Files.createTempDirectory("halo-restore-"); - return unpackBackup(content, tempDir) - .and(restoreExtensions(tempDir)) - .and(restoreWorkdir(tempDir)) - .doFinally(signalType -> deleteRecursivelyAndSilently(tempDir)) - .subscribeOn(Schedulers.boundedElastic()); - } catch (IOException e) { - return Mono.error(e); - } - }); + return Mono.usingWhen( + createTempDir("halo-restore-", scheduler), + tempDir -> unpackBackup(content, tempDir) + .then(Mono.defer(() -> restoreExtensions(tempDir))) + .then(Mono.defer(() -> restoreWorkdir(tempDir))), + tempDir -> deleteRecursivelyAndSilently(tempDir, scheduler) + ); } @Override public Mono cleanup(Backup backup) { - return Mono.fromRunnable(() -> { + return Mono.create(sink -> { var status = backup.getStatus(); if (status == null || status.getFilename() == null) { return; @@ -151,110 +148,104 @@ public class MigrationServiceImpl implements MigrationService { var backupsRoot = getBackupsRoot(); var backupFile = backupsRoot.resolve(filename); try { - FileUtils.checkDirectoryTraversal(backupsRoot, backupFile); - Files.deleteIfExists(backupFile); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }).subscribeOn(Schedulers.boundedElastic()); - } - - private Mono restoreWorkdir(Path backupRoot) { - return Mono.fromRunnable(() -> { - try { - var workdir = backupRoot.resolve("workdir"); - if (Files.exists(workdir)) { - FileSystemUtils.copyRecursively(workdir, haloProperties.getWorkDir()); - } - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } - - private Mono restoreExtensions(Path backupRoot) { - var extensionsPath = backupRoot.resolve("extensions.data"); - var reader = objectMapper.readerFor(ExtensionStore.class); - return Mono.>using( - () -> reader.readValues(extensionsPath.toFile()), - itr -> Flux.create( - sink -> { - while (itr.hasNext()) { - sink.next(itr.next()); - } - sink.complete(); - }) - // reset version - .doOnNext(extensionStore -> extensionStore.setVersion(null)) - .buffer(100) - // We might encounter OptimisticLockingFailureException when saving extension store, - // So we have to delete all extension stores before saving. - .flatMap(extensionStores -> repository.deleteAll(extensionStores) - .thenMany(repository.saveAll(extensionStores))) - .doOnNext(extensionStore -> - log.info("Restored extension store: {}", extensionStore.getName())) - .then(), - itr -> { - try { - itr.close(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } - - private Mono unpackBackup(Publisher content, Path target) { - return Mono.create(sink -> { - try (var pipedIs = new PipedInputStream(); - var pipedOs = new PipedOutputStream(pipedIs); - var zipIs = new ZipInputStream(pipedIs)) { - DataBufferUtils.write(content, pipedOs) - .subscribe( - releaseConsumer(), - sink::error, - () -> closeQuietly(pipedOs), - Context.of(sink.contextView())); - FileUtils.unzip(zipIs, target); + checkDirectoryTraversal(backupsRoot, backupFile); + deleteIfExists(backupFile); sink.success(); } catch (IOException e) { sink.error(e); } - }); + }).subscribeOn(scheduler); + } + + private Mono restoreWorkdir(Path backupRoot) { + return Mono.create(sink -> { + try { + var workdir = backupRoot.resolve("workdir"); + if (Files.exists(workdir)) { + copyRecursively(workdir, haloProperties.getWorkDir()); + } + sink.success(); + } catch (IOException e) { + sink.error(e); + } + }).subscribeOn(scheduler); + } + + private Mono restoreExtensions(Path backupRoot) { + var extensionsPath = backupRoot.resolve("extensions.data"); + if (Files.notExists(extensionsPath)) { + return Mono.empty(); + } + var reader = objectMapper.readerFor(ExtensionStore.class); + return Mono.>using( + () -> reader.readValues(extensionsPath.toFile()), + itr -> Flux.create( + sink -> { + while (itr.hasNext()) { + sink.next(itr.next()); + } + sink.complete(); + }) + // reset version + .doOnNext(extensionStore -> extensionStore.setVersion(null)).buffer(100) + // We might encounter OptimisticLockingFailureException when saving extension + // store, + // So we have to delete all extension stores before saving. + .flatMap(extensionStores -> repository.deleteAll(extensionStores) + .thenMany(repository.saveAll(extensionStores)) + ) + .doOnNext(extensionStore -> log.info("Restored extension store: {}", + extensionStore.getName())) + .then(), + FileUtils::closeQuietly) + .subscribeOn(scheduler); + } + + private Mono unpackBackup(Publisher content, Path target) { + return unzip(content, target, scheduler); } private Mono packageBackup(Path baseDir, Backup backup) { - return Mono.fromRunnable(() -> { - try { - var backupsFolder = getBackupsRoot(); - Files.createDirectories(backupsFolder); + return Mono.fromCallable( + () -> { + var backupsFolder = getBackupsRoot(); + Files.createDirectories(backupsFolder); + return backupsFolder; + }) + .handle((backupsFolder, sink) -> { var backupName = backup.getMetadata().getName(); var startTimestamp = backup.getStatus().getStartTimestamp(); var timePart = this.dateTimeFormatter.format(startTimestamp); var backupFile = backupsFolder.resolve(timePart + '-' + backupName + ".zip"); - FileUtils.zip(baseDir, backupFile); - backup.getStatus().setFilename(backupFile.getFileName().toString()); - backup.getStatus().setSize(Files.size(backupFile)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); + try { + FileUtils.zip(baseDir, backupFile); + backup.getStatus().setFilename(backupFile.getFileName().toString()); + backup.getStatus().setSize(Files.size(backupFile)); + sink.complete(); + } catch (IOException e) { + sink.error(e); + } + }) + .subscribeOn(scheduler); } private Mono backupWorkDir(Path baseDir) { - return Mono.fromRunnable(() -> { - try { - var workdirPath = Files.createDirectory(baseDir.resolve("workdir")); - FileUtils.copyRecursively(haloProperties.getWorkDir(), workdirPath, excludes); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); + return Mono.fromCallable(() -> Files.createDirectory(baseDir.resolve("workdir"))) + .handle((workdirPath, sink) -> { + try { + copyRecursively(haloProperties.getWorkDir(), workdirPath, excludes); + sink.complete(); + } catch (IOException e) { + sink.error(e); + } + }) + .subscribeOn(scheduler); } private Mono backupExtensions(Path baseDir) { - try { - var extensionsPath = Files.createFile(baseDir.resolve("extensions.data")); - return Mono.using(() -> objectMapper.writerFor(ExtensionStore.class) + return Mono.fromCallable(() -> Files.createFile(baseDir.resolve("extensions.data"))) + .flatMap(extensionsPath -> Mono.using( + () -> objectMapper.writerFor(ExtensionStore.class) .writeValuesAsArray(extensionsPath.toFile()), seqWriter -> repository.findAll() .doOnNext(extensionStore -> { @@ -263,17 +254,8 @@ public class MigrationServiceImpl implements MigrationService { } catch (IOException e) { throw Exceptions.propagate(e); } - }) - .then(), - seqWriter -> { - try { - seqWriter.close(); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - }); - } catch (IOException e) { - return Mono.error(e); - } + }).then(), + FileUtils::closeQuietly)) + .subscribeOn(scheduler); } } diff --git a/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java b/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java index 7425fad48..4fc3194bb 100644 --- a/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java +++ b/application/src/test/java/run/halo/app/core/extension/theme/ThemeEndpointTest.java @@ -4,7 +4,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -12,7 +11,6 @@ import static org.springframework.web.reactive.function.BodyInserters.fromMultip import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -24,15 +22,14 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivestreams.Publisher; import org.springframework.core.io.FileSystemResource; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.util.FileSystemUtils; import org.springframework.util.ResourceUtils; import org.springframework.web.server.ServerWebInputException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import run.halo.app.core.extension.Setting; import run.halo.app.core.extension.Theme; @@ -70,7 +67,7 @@ class ThemeEndpointTest { private SystemConfigurableEnvironmentFetcher environmentFetcher; @Mock - private ReactiveUrlDataBufferFetcher reactiveUrlDataBufferFetcher; + private ReactiveUrlDataBufferFetcher urlDataBufferFetcher; @InjectMocks ThemeEndpoint themeEndpoint; @@ -105,7 +102,7 @@ class ThemeEndpointTest { bodyBuilder.part("file", new FileSystemResource(defaultTheme)) .contentType(MediaType.MULTIPART_FORM_DATA); - when(themeService.upgrade(eq("invalid-missing-manifest"), isA(InputStream.class))) + when(themeService.upgrade(eq("invalid-missing-manifest"), isA(Publisher.class))) .thenReturn( Mono.error(() -> new ServerWebInputException("Failed to upgrade theme"))); @@ -115,7 +112,7 @@ class ThemeEndpointTest { .exchange() .expectStatus().isBadRequest(); - verify(themeService).upgrade(eq("invalid-missing-manifest"), isA(InputStream.class)); + verify(themeService).upgrade(eq("invalid-missing-manifest"), isA(Publisher.class)); } @Test @@ -129,7 +126,7 @@ class ThemeEndpointTest { var newTheme = new Theme(); newTheme.setMetadata(metadata); - when(themeService.upgrade(eq("default"), isA(InputStream.class))) + when(themeService.upgrade(eq("default"), isA(Publisher.class))) .thenReturn(Mono.just(newTheme)); when(templateEngineManager.clearCache(eq("default"))) @@ -141,22 +138,20 @@ class ThemeEndpointTest { .exchange() .expectStatus().isOk(); - verify(themeService).upgrade(eq("default"), isA(InputStream.class)); + verify(themeService).upgrade(eq("default"), isA(Publisher.class)); verify(templateEngineManager, times(1)).clearCache(eq("default")); } @Test void upgradeFromUri() { - final URI uri = URI.create("https://example.com/test-theme.zip"); - Theme fakeTheme = mock(Theme.class); - Metadata metadata = new Metadata(); + var uri = URI.create("https://example.com/test-theme.zip"); + var metadata = new Metadata(); metadata.setName("default"); - when(fakeTheme.getMetadata()).thenReturn(metadata); - when(themeService.upgrade(eq("default"), isA(InputStream.class))) + var fakeTheme = new Theme(); + fakeTheme.setMetadata(metadata); + when(themeService.upgrade(eq("default"), any())) .thenReturn(Mono.just(fakeTheme)); - when(reactiveUrlDataBufferFetcher.fetch(eq(uri))) - .thenReturn(Flux.just(mock(DataBuffer.class))); when(templateEngineManager.clearCache(eq("default"))) .thenReturn(Mono.empty()); var body = new ThemeEndpoint.UpgradeFromUriRequest(uri); @@ -164,13 +159,12 @@ class ThemeEndpointTest { .uri("/themes/default/upgrade-from-uri") .bodyValue(body) .exchange() - .expectStatus().isOk(); + .expectStatus().isOk() + .expectBody(Theme.class).isEqualTo(fakeTheme); - verify(themeService).upgrade(eq("default"), isA(InputStream.class)); + verify(themeService).upgrade(eq("default"), any()); verify(templateEngineManager, times(1)).clearCache(eq("default")); - - verify(reactiveUrlDataBufferFetcher).fetch(eq(uri)); } } @@ -210,20 +204,21 @@ class ThemeEndpointTest { @Test void installFromUri() { final URI uri = URI.create("https://example.com/test-theme.zip"); - Theme fakeTheme = mock(Theme.class); - when(themeService.install(isA(InputStream.class))) - .thenReturn(Mono.just(fakeTheme)); - when(reactiveUrlDataBufferFetcher.fetch(eq(uri))) - .thenReturn(Flux.just(mock(DataBuffer.class))); + var metadata = new Metadata(); + metadata.setName("fake-theme"); + var theme = new Theme(); + theme.setMetadata(metadata); + + when(themeService.install(any())).thenReturn(Mono.just(theme)); var body = new ThemeEndpoint.UpgradeFromUriRequest(uri); webTestClient.post() .uri("/themes/-/install-from-uri") .bodyValue(body) .exchange() - .expectStatus().isOk(); + .expectStatus().isOk() + .expectBody(Theme.class).isEqualTo(theme); - verify(themeService).install(isA(InputStream.class)); - verify(reactiveUrlDataBufferFetcher).fetch(eq(uri)); + verify(themeService).install(any()); } @Test diff --git a/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java b/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java index eed3f5285..045b5f14e 100644 --- a/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java +++ b/application/src/test/java/run/halo/app/core/extension/theme/ThemeServiceImplTest.java @@ -32,7 +32,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import org.skyscreamer.jsonassert.JSONAssert; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; import org.springframework.web.server.ServerWebInputException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -112,6 +116,13 @@ class ThemeServiceImplTest { return Unstructured.OBJECT_MAPPER.convertValue(theme, Unstructured.class); } + Flux content(Path path) { + return DataBufferUtils.read( + path, + DefaultDataBufferFactory.sharedInstance, + StreamUtils.BUFFER_SIZE); + } + @Nested class UpgradeTest { @@ -119,10 +130,8 @@ class ThemeServiceImplTest { void shouldFailIfThemeNotInstalledBefore() throws IOException, URISyntaxException { var themeZipPath = prepareTheme("other"); when(client.fetch(Theme.class, "default")).thenReturn(Mono.empty()); - try (var is = Files.newInputStream(themeZipPath)) { - StepVerifier.create(themeService.upgrade("default", is)) - .verifyError(ServerWebInputException.class); - } + StepVerifier.create(themeService.upgrade("default", content(themeZipPath))) + .verifyError(ServerWebInputException.class); verify(client).fetch(Theme.class, "default"); } @@ -144,14 +153,12 @@ class ThemeServiceImplTest { when(client.create(isA(Unstructured.class))).thenReturn( Mono.just(convert(createTheme(t -> t.getSpec().setDisplayName("New fake theme"))))); - try (var is = Files.newInputStream(themeZipPath)) { - StepVerifier.create(themeService.upgrade("default", is)) - .consumeNextWith(newTheme -> { - assertEquals("default", newTheme.getMetadata().getName()); - assertEquals("New fake theme", newTheme.getSpec().getDisplayName()); - }) - .verifyComplete(); - } + StepVerifier.create(themeService.upgrade("default", content(themeZipPath))) + .consumeNextWith(newTheme -> { + assertEquals("default", newTheme.getMetadata().getName()); + assertEquals("New fake theme", newTheme.getSpec().getDisplayName()); + }) + .verifyComplete(); verify(client, times(3)).fetch(Theme.class, "default"); verify(client).delete(oldTheme); @@ -168,14 +175,12 @@ class ThemeServiceImplTest { var defaultThemeZipPath = prepareTheme("default"); when(client.create(isA(Unstructured.class))).thenReturn( Mono.just(convert(createTheme()))); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .consumeNextWith(theme -> { - assertEquals("default", theme.getMetadata().getName()); - assertEquals("Default", theme.getSpec().getDisplayName()); - }) - .verifyComplete(); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .consumeNextWith(theme -> { + assertEquals("default", theme.getMetadata().getName()); + assertEquals("Default", theme.getSpec().getDisplayName()); + }) + .verifyComplete(); } @Test @@ -183,19 +188,15 @@ class ThemeServiceImplTest { var defaultThemeZipPath = prepareTheme("default"); when(client.create(isA(Unstructured.class))).thenReturn( Mono.error(() -> new ExtensionException("Failed to create the extension"))); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .verifyError(ExtensionException.class); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .verifyError(ExtensionException.class); } @Test void shouldFailWhenThemeManifestIsInvalid() throws IOException, URISyntaxException { var defaultThemeZipPath = prepareTheme("invalid-missing-manifest"); - try (var is = Files.newInputStream(defaultThemeZipPath)) { - StepVerifier.create(themeService.install(is)) - .verifyError(ThemeInstallationException.class); - } + StepVerifier.create(themeService.install(content(defaultThemeZipPath))) + .verifyError(ThemeInstallationException.class); } }