diff --git a/src/main/java/run/halo/app/config/ExtensionConfiguration.java b/src/main/java/run/halo/app/config/ExtensionConfiguration.java index cd388de9e..d7ef8f3b6 100644 --- a/src/main/java/run/halo/app/config/ExtensionConfiguration.java +++ b/src/main/java/run/halo/app/config/ExtensionConfiguration.java @@ -5,6 +5,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; +import run.halo.app.core.extension.User; +import run.halo.app.core.extension.reconciler.UserReconciler; import run.halo.app.extension.DefaultExtensionClient; import run.halo.app.extension.DefaultSchemeManager; import run.halo.app.extension.DefaultSchemeWatcherManager; @@ -14,6 +16,8 @@ import run.halo.app.extension.JSONExtensionConverter; import run.halo.app.extension.SchemeManager; import run.halo.app.extension.SchemeWatcherManager; import run.halo.app.extension.SchemeWatcherManager.SchemeWatcher; +import run.halo.app.extension.controller.Controller; +import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.store.ExtensionStoreClient; @Configuration(proxyBeanMethods = false) @@ -40,4 +44,12 @@ public class ExtensionConfiguration { SchemeWatcherManager schemeWatcherManager() { return new DefaultSchemeWatcherManager(); } + + @Bean + Controller userController(ExtensionClient client) { + return new ControllerBuilder("user-controller", client) + .reconciler(new UserReconciler(client)) + .extension(new User()) + .build(); + } } diff --git a/src/main/java/run/halo/app/core/extension/reconciler/UserReconciler.java b/src/main/java/run/halo/app/core/extension/reconciler/UserReconciler.java new file mode 100644 index 000000000..8cb800f57 --- /dev/null +++ b/src/main/java/run/halo/app/core/extension/reconciler/UserReconciler.java @@ -0,0 +1,22 @@ +package run.halo.app.core.extension.reconciler; + +import lombok.extern.slf4j.Slf4j; +import run.halo.app.extension.ExtensionClient; +import run.halo.app.extension.controller.Reconciler; + +@Slf4j +public class UserReconciler implements Reconciler { + + private final ExtensionClient client; + + public UserReconciler(ExtensionClient client) { + this.client = client; + } + + @Override + public Result reconcile(Request request) { + //TODO Add reconciliation logic here for User extension. + return new Result(false, null); + } + +} diff --git a/src/main/java/run/halo/app/extension/DefaultExtensionClient.java b/src/main/java/run/halo/app/extension/DefaultExtensionClient.java index 95ba8e030..c6b731e3c 100644 --- a/src/main/java/run/halo/app/extension/DefaultExtensionClient.java +++ b/src/main/java/run/halo/app/extension/DefaultExtensionClient.java @@ -9,7 +9,6 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.support.PageableExecutionUtils; import org.springframework.util.Assert; -import run.halo.app.extension.store.ExtensionStore; import run.halo.app.extension.store.ExtensionStoreClient; /** @@ -24,12 +23,16 @@ public class DefaultExtensionClient implements ExtensionClient { private final SchemeManager schemeManager; + private final Watcher.WatcherComposite watchers; + public DefaultExtensionClient(ExtensionStoreClient storeClient, ExtensionConverter converter, SchemeManager schemeManager) { this.storeClient = storeClient; this.converter = converter; this.schemeManager = schemeManager; + + watchers = new Watcher.WatcherComposite(); } @Override @@ -82,7 +85,9 @@ public class DefaultExtensionClient implements ExtensionClient { metadata.setCreationTimestamp(Instant.now()); // extension.setMetadata(metadata); var extensionStore = converter.convertTo(extension); - storeClient.create(extensionStore.getName(), extensionStore.getData()); + var createdStore = storeClient.create(extensionStore.getName(), extensionStore.getData()); + var createdExt = converter.convertFrom(extension.getClass(), createdStore); + watchers.onAdd(createdExt); } @Override @@ -90,14 +95,23 @@ public class DefaultExtensionClient implements ExtensionClient { var extensionStore = converter.convertTo(extension); Assert.notNull(extension.getMetadata().getVersion(), "Extension version must not be null when updating"); - storeClient.update(extensionStore.getName(), extensionStore.getVersion(), + var updatedStore = storeClient.update(extensionStore.getName(), extensionStore.getVersion(), extensionStore.getData()); + var updatedExt = converter.convertFrom(extension.getClass(), updatedStore); + watchers.onUpdate(extension, updatedExt); } @Override public void delete(E extension) { - ExtensionStore extensionStore = converter.convertTo(extension); - storeClient.delete(extensionStore.getName(), extensionStore.getVersion()); + var extensionStore = converter.convertTo(extension); + var deleteStore = storeClient.delete(extensionStore.getName(), extensionStore.getVersion()); + Extension deleteExt = converter.convertFrom(extension.getClass(), deleteStore); + watchers.onDelete(deleteExt); + } + + @Override + public void watch(Watcher watcher) { + this.watchers.addWatcher(watcher); } } diff --git a/src/main/java/run/halo/app/extension/ExtensionClient.java b/src/main/java/run/halo/app/extension/ExtensionClient.java index 3027a2cd1..486fa23ec 100644 --- a/src/main/java/run/halo/app/extension/ExtensionClient.java +++ b/src/main/java/run/halo/app/extension/ExtensionClient.java @@ -18,8 +18,8 @@ public interface ExtensionClient { * Lists Extensions by Extension type, filter and sorter. * * @param type is the class type of Extension. - * @param predicate filters the result. - * @param comparator sorts the result. + * @param predicate filters the reEnqueue. + * @param comparator sorts the reEnqueue. * @param is Extension type. * @return all filtered and sorted Extensions. */ @@ -30,8 +30,8 @@ public interface ExtensionClient { * Lists Extensions by Extension type, filter, sorter and page info. * * @param type is the class type of Extension. - * @param predicate filters the result. - * @param comparator sorts the result. + * @param predicate filters the reEnqueue. + * @param comparator sorts the reEnqueue. * @param page is page number which starts from 0. * @param size is page size. * @param is Extension type. @@ -80,4 +80,6 @@ public interface ExtensionClient { */ void delete(E extension); + void watch(Watcher watcher); + } diff --git a/src/main/java/run/halo/app/extension/Watcher.java b/src/main/java/run/halo/app/extension/Watcher.java new file mode 100644 index 000000000..4fa9bc967 --- /dev/null +++ b/src/main/java/run/halo/app/extension/Watcher.java @@ -0,0 +1,84 @@ +package run.halo.app.extension; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import reactor.core.Disposable; + +public interface Watcher extends Disposable { + + default void onAdd(Extension extension) { + // Do nothing here + } + + default void onUpdate(Extension oldExtension, Extension newExtension) { + // Do nothing here + } + + default void onDelete(Extension extension) { + // Do nothing here + } + + default void registerDisposeHook(Runnable dispose) { + } + + class WatcherComposite implements Watcher { + + private final List watchers; + + private volatile boolean disposed = false; + + private Runnable disposeHook; + + public WatcherComposite() { + watchers = new CopyOnWriteArrayList<>(); + } + + @Override + public void onAdd(Extension extension) { + // TODO Deep copy extension and execute onAdd asynchronously + watchers.forEach(watcher -> watcher.onAdd(extension)); + } + + @Override + public void onUpdate(Extension oldExtension, Extension newExtension) { + // TODO Deep copy extension and execute onUpdate asynchronously + watchers.forEach(watcher -> watcher.onUpdate(oldExtension, newExtension)); + } + + @Override + public void onDelete(Extension extension) { + // TODO Deep copy extension and execute onDelete asynchronously + watchers.forEach(watcher -> watcher.onDelete(extension)); + } + + public void addWatcher(Watcher watcher) { + if (!watcher.isDisposed() && !watchers.contains(watcher)) { + watchers.add(watcher); + watcher.registerDisposeHook(() -> removeWatcher(watcher)); + } + } + + public void removeWatcher(Watcher watcher) { + watchers.remove(watcher); + } + + @Override + public void registerDisposeHook(Runnable dispose) { + this.disposeHook = dispose; + } + + @Override + public void dispose() { + this.disposed = true; + this.watchers.clear(); + if (this.disposeHook != null) { + this.disposeHook.run(); + } + } + + @Override + public boolean isDisposed() { + return this.disposed; + } + } +} diff --git a/src/main/java/run/halo/app/extension/WatcherPredicates.java b/src/main/java/run/halo/app/extension/WatcherPredicates.java new file mode 100644 index 000000000..62a6afd3d --- /dev/null +++ b/src/main/java/run/halo/app/extension/WatcherPredicates.java @@ -0,0 +1,99 @@ +package run.halo.app.extension; + +import java.util.function.BiPredicate; +import java.util.function.Predicate; + +public class WatcherPredicates { + + static final Predicate EMPTY_PREDICATE = (e) -> true; + + static final BiPredicate EMPTY_BI_PREDICATE = (oldExt, newExt) -> true; + private final Predicate onAddPredicate; + private final BiPredicate onUpdatePredicate; + private final Predicate onDeletePredicate; + + public WatcherPredicates(Predicate onAddPredicate, + BiPredicate onUpdatePredicate, + Predicate onDeletePredicate) { + this.onAddPredicate = onAddPredicate; + this.onUpdatePredicate = onUpdatePredicate; + this.onDeletePredicate = onDeletePredicate; + } + + public Predicate onAddPredicate() { + if (onAddPredicate == null) { + return EMPTY_PREDICATE; + } + return onAddPredicate; + } + + public BiPredicate onUpdatePredicate() { + if (onUpdatePredicate == null) { + return EMPTY_BI_PREDICATE; + } + return onUpdatePredicate; + } + + public Predicate onDeletePredicate() { + if (onDeletePredicate == null) { + return EMPTY_PREDICATE; + } + return onDeletePredicate; + } + + public static final class Builder { + + private Predicate onAddPredicate; + private BiPredicate onUpdatePredicate; + private Predicate onDeletePredicate; + + private GroupVersionKind gvk; + + public Builder withGroupVersionKind(GroupVersionKind gvk) { + this.gvk = gvk; + return this; + } + + public Builder onAddPredicate(Predicate onAddPredicate) { + this.onAddPredicate = onAddPredicate; + return this; + } + + public Builder onUpdatePredicate( + BiPredicate onUpdatePredicate) { + this.onUpdatePredicate = onUpdatePredicate; + return this; + } + + public Builder onDeletePredicate(Predicate onDeletePredicate) { + this.onDeletePredicate = onDeletePredicate; + return this; + } + + public WatcherPredicates build() { + Predicate gvkPredicate = EMPTY_PREDICATE; + BiPredicate gvkBiPredicate = EMPTY_BI_PREDICATE; + if (gvk != null) { + gvkPredicate = e -> gvk.equals(e.groupVersionKind()); + gvkBiPredicate = (oldE, newE) -> oldE.groupVersionKind().equals(gvk) + && newE.groupVersionKind().equals(gvk); + } + if (onAddPredicate == null) { + onAddPredicate = EMPTY_PREDICATE; + } + if (onUpdatePredicate == null) { + onUpdatePredicate = EMPTY_BI_PREDICATE; + } + if (onDeletePredicate == null) { + onDeletePredicate = EMPTY_PREDICATE; + } + + onAddPredicate = gvkPredicate.and(onAddPredicate); + onUpdatePredicate = gvkBiPredicate.and(onUpdatePredicate); + onDeletePredicate = gvkPredicate.and(onDeletePredicate); + + return new WatcherPredicates(onAddPredicate, onUpdatePredicate, onDeletePredicate); + } + + } +} diff --git a/src/main/java/run/halo/app/extension/controller/Controller.java b/src/main/java/run/halo/app/extension/controller/Controller.java new file mode 100644 index 000000000..680f09b3d --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/Controller.java @@ -0,0 +1,11 @@ +package run.halo.app.extension.controller; + +import reactor.core.Disposable; + +public interface Controller extends Disposable { + + String getName(); + + void start(); + +} diff --git a/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java new file mode 100644 index 000000000..4ea79b82c --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java @@ -0,0 +1,120 @@ +package run.halo.app.extension.controller; + +import java.time.Duration; +import java.time.Instant; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import org.springframework.util.Assert; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionClient; +import run.halo.app.extension.WatcherPredicates; + +public class ControllerBuilder { + + private final String name; + + private Duration minDelay; + + private Duration maxDelay; + + private Reconciler reconciler; + + private Supplier nowSupplier; + + private Extension extension; + + private Predicate onAddPredicate; + + private Predicate onDeletePredicate; + + private BiPredicate onUpdatePredicate; + + private final ExtensionClient client; + + private boolean syncAllOnStart = true; + + public ControllerBuilder(String name, ExtensionClient client) { + Assert.hasText(name, "Extension name is required"); + Assert.notNull(client, "Extension client must not be null"); + this.name = name; + this.client = client; + } + + public ControllerBuilder minDelay(Duration minDelay) { + this.minDelay = minDelay; + return this; + } + + public ControllerBuilder maxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + return this; + } + + public ControllerBuilder reconciler(Reconciler reconciler) { + this.reconciler = reconciler; + return this; + } + + public ControllerBuilder nowSupplier(Supplier nowSupplier) { + this.nowSupplier = nowSupplier; + return this; + } + + public ControllerBuilder extension(Extension extension) { + this.extension = extension; + return this; + } + + public ControllerBuilder onAddPredicate(Predicate onAddPredicate) { + this.onAddPredicate = onAddPredicate; + return this; + } + + public ControllerBuilder onDeletePredicate(Predicate onDeletePredicate) { + this.onDeletePredicate = onDeletePredicate; + return this; + } + + public ControllerBuilder onUpdatePredicate( + BiPredicate onUpdatePredicate) { + this.onUpdatePredicate = onUpdatePredicate; + return this; + } + + public ControllerBuilder syncAllOnStart(boolean syncAllAtStart) { + this.syncAllOnStart = syncAllAtStart; + return this; + } + + public Controller build() { + if (nowSupplier == null) { + nowSupplier = Instant::now; + } + if (minDelay == null || minDelay.isNegative() || minDelay.isZero()) { + minDelay = Duration.ofMillis(5); + } + if (maxDelay == null || maxDelay.isNegative() || maxDelay.isZero()) { + maxDelay = Duration.ofSeconds(1000); + } + Assert.isTrue(minDelay.compareTo(maxDelay) <= 0, + "Min delay must be less than or equal to max delay"); + Assert.notNull(extension, "Extension must not be null"); + Assert.notNull(reconciler, "Reconciler must not be null"); + + var queue = new DefaultDelayQueue(nowSupplier, minDelay); + var predicates = new WatcherPredicates.Builder() + .withGroupVersionKind(extension.groupVersionKind()) + .onAddPredicate(onAddPredicate) + .onUpdatePredicate(onUpdatePredicate) + .onDeletePredicate(onDeletePredicate) + .build(); + var watcher = new ExtensionWatcher(queue, predicates); + var synchronizer = new RequestSynchronizer(syncAllOnStart, + client, + extension, + watcher, + predicates.onAddPredicate()); + return new DefaultController(name, reconciler, queue, synchronizer, minDelay, maxDelay); + } +} diff --git a/src/main/java/run/halo/app/extension/controller/ControllerManager.java b/src/main/java/run/halo/app/extension/controller/ControllerManager.java new file mode 100644 index 000000000..776cd4600 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/ControllerManager.java @@ -0,0 +1,42 @@ +package run.halo.app.extension.controller; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ControllerManager implements ApplicationListener, + DisposableBean { + + private final ApplicationContext applicationContext; + + public ControllerManager(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + applicationContext.getBeansOfType(Controller.class).values().forEach(Controller::start); + } + + @Override + public void destroy() { + var controllers = + applicationContext.getBeansOfType(Controller.class).values(); + log.info("Shutting down {} controllers...", controllers.size()); + controllers.forEach( + controller -> { + try { + controller.dispose(); + } catch (Throwable t) { + log.error("Failed to dispose controller {}", controller.getName(), t); + } + }); + log.info("Shutdown {} controllers.", controllers.size()); + } + +} diff --git a/src/main/java/run/halo/app/extension/controller/DefaultController.java b/src/main/java/run/halo/app/extension/controller/DefaultController.java new file mode 100644 index 000000000..1ce920d62 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/DefaultController.java @@ -0,0 +1,177 @@ +package run.halo.app.extension.controller; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StopWatch; +import run.halo.app.extension.controller.RequestQueue.DelayedEntry; + +@Slf4j +class DefaultController implements Controller { + + private final String name; + + private final Reconciler reconciler; + + private final Supplier nowSupplier; + + private final RequestQueue queue; + + private volatile boolean disposed = false; + + private volatile boolean started = false; + + private final ExecutorService executor; + + private final RequestSynchronizer synchronizer; + + private final Duration minDelay; + + private final Duration maxDelay; + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + RequestSynchronizer synchronizer, + Duration minDelay, + Duration maxDelay) { + this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay); + } + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + RequestSynchronizer synchronizer, + Supplier nowSupplier, + Duration minDelay, + Duration maxDelay, + ExecutorService executor) { + this.name = name; + this.reconciler = reconciler; + this.nowSupplier = nowSupplier; + this.queue = queue; + this.synchronizer = synchronizer; + this.minDelay = minDelay; + this.maxDelay = maxDelay; + this.executor = executor; + } + + public DefaultController(String name, + Reconciler reconciler, + RequestQueue queue, + RequestSynchronizer synchronizer, + Supplier nowSupplier, + Duration minDelay, + Duration maxDelay) { + this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay, + Executors.newSingleThreadExecutor()); + } + + @Override + public String getName() { + return name; + } + + @Override + public void start() { + if (isStarted() || isDisposed()) { + log.warn("Controller {} is already started or disposed.", getName()); + return; + } + this.started = true; + log.info("Starting controller {}", name); + // TODO Make more workers run the reconciler. + executor.submit(this::run); + } + + protected void run() { + log.info("Controller {} started", name); + synchronizer.start(); + while (!this.isDisposed() && !Thread.currentThread().isInterrupted()) { + try { + var entry = queue.take(); + Reconciler.Result result; + try { + log.debug("Reconciling request {} at {}", entry.getEntry(), nowSupplier.get()); + StopWatch watch = new StopWatch("Reconcile: " + entry.getEntry().name()); + watch.start("reconciliation"); + result = this.reconciler.reconcile(entry.getEntry()); + watch.stop(); + log.debug("Reconciled request: {} with result: {}", entry.getEntry(), result); + if (log.isDebugEnabled()) { + log.debug(watch.toString()); + } + } catch (Throwable t) { + log.error("Reconciler aborted with an error, re-enqueuing...", t); + result = new Reconciler.Result(true, null); + } finally { + queue.done(entry.getEntry()); + } + if (!result.reEnqueue()) { + continue; + } + var retryAfter = result.retryAfter(); + if (retryAfter == null) { + retryAfter = entry.getRetryAfter(); + if (retryAfter == null + || retryAfter.isNegative() + || retryAfter.isZero() + || retryAfter.compareTo(minDelay) < 0) { + // set min retry after + retryAfter = minDelay; + } else { + try { + // TODO Refactor the retryAfter with ratelimiter + retryAfter = retryAfter.multipliedBy(2); + } catch (ArithmeticException e) { + retryAfter = maxDelay; + } + } + if (retryAfter.compareTo(maxDelay) > 0) { + retryAfter = maxDelay; + } + } + queue.add( + new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Controller {} interrupted", name); + } + } + log.info("Controller {} is stopped", name); + } + + @Override + public void dispose() { + disposed = true; + log.info("Disposing controller {}", name); + + synchronizer.dispose(); + + executor.shutdownNow(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Wait timeout for controller {} shutdown", name); + } else { + log.info("Controller {} is disposed", name); + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for controller {} shutdown", name); + } finally { + queue.dispose(); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + public boolean isStarted() { + return started; + } +} diff --git a/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java b/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java new file mode 100644 index 000000000..fbe939de9 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java @@ -0,0 +1,85 @@ +package run.halo.app.extension.controller; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.DelayQueue; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import run.halo.app.extension.controller.Reconciler.Request; + +@Slf4j +public class DefaultDelayQueue + extends DelayQueue> implements RequestQueue { + + private final Supplier nowSupplier; + + private volatile boolean disposed = false; + + private final Duration minDelay; + + private final Set processing; + + public DefaultDelayQueue(Supplier nowSupplier) { + this(nowSupplier, Duration.ZERO); + } + + public DefaultDelayQueue(Supplier nowSupplier, Duration minDelay) { + this.nowSupplier = nowSupplier; + this.minDelay = minDelay; + this.processing = new HashSet<>(); + } + + @Override + public boolean addImmediately(Request request) { + var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier); + return offer(delayedEntry); + } + + @Override + public boolean add(DelayedEntry entry) { + if (entry.getRetryAfter().compareTo(minDelay) < 0) { + log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms", + entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis()); + entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier); + } + return super.add(entry); + } + + @Override + public DelayedEntry take() throws InterruptedException { + var entry = super.take(); + processing.add(entry.getEntry()); + return entry; + } + + @Override + public void done(Request request) { + processing.remove(request); + } + + @Override + public boolean offer(DelayedEntry entry) { + if (this.isDisposed() || processing.contains(entry.getEntry())) { + return false; + } + // remove the existing entry before adding the new one + // to refresh the delay. + this.remove(entry); + return super.offer(entry); + } + + @Override + public void dispose() { + this.disposed = true; + this.clear(); + this.processing.clear(); + } + + @Override + public boolean isDisposed() { + return this.disposed; + } + +} diff --git a/src/main/java/run/halo/app/extension/controller/ExtensionWatcher.java b/src/main/java/run/halo/app/extension/controller/ExtensionWatcher.java new file mode 100644 index 000000000..b5a5c0613 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/ExtensionWatcher.java @@ -0,0 +1,66 @@ +package run.halo.app.extension.controller; + +import run.halo.app.extension.Extension; +import run.halo.app.extension.Watcher; +import run.halo.app.extension.WatcherPredicates; + +public class ExtensionWatcher implements Watcher { + + private final RequestQueue queue; + + private volatile boolean disposed = false; + + private Runnable disposeHook; + private final WatcherPredicates predicates; + + public ExtensionWatcher(RequestQueue queue, WatcherPredicates predicates) { + this.queue = queue; + this.predicates = predicates; + } + + @Override + public void onAdd(Extension extension) { + if (isDisposed() || !predicates.onAddPredicate().test(extension)) { + return; + } + // TODO filter the event + queue.addImmediately(new Reconciler.Request(extension.getMetadata().getName())); + } + + @Override + public void onUpdate(Extension oldExtension, Extension newExtension) { + if (isDisposed() || !predicates.onUpdatePredicate().test(oldExtension, newExtension)) { + return; + } + // TODO filter the event + queue.addImmediately(new Reconciler.Request(newExtension.getMetadata().getName())); + } + + @Override + public void onDelete(Extension extension) { + if (isDisposed() || !predicates.onDeletePredicate().test(extension)) { + return; + } + // TODO filter the event + queue.addImmediately(new Reconciler.Request(extension.getMetadata().getName())); + } + + @Override + public void registerDisposeHook(Runnable dispose) { + this.disposeHook = dispose; + } + + @Override + public void dispose() { + disposed = true; + if (this.disposeHook != null) { + this.disposeHook.run(); + } + } + + @Override + public boolean isDisposed() { + return this.disposed; + } + +} diff --git a/src/main/java/run/halo/app/extension/controller/Reconciler.java b/src/main/java/run/halo/app/extension/controller/Reconciler.java new file mode 100644 index 000000000..d7de6b3c2 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/Reconciler.java @@ -0,0 +1,15 @@ +package run.halo.app.extension.controller; + +import java.time.Duration; + +public interface Reconciler { + + Result reconcile(Request request); + + record Request(String name) { + } + + record Result(boolean reEnqueue, Duration retryAfter) { + + } +} diff --git a/src/main/java/run/halo/app/extension/controller/RequestQueue.java b/src/main/java/run/halo/app/extension/controller/RequestQueue.java new file mode 100644 index 000000000..97a3e0740 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/RequestQueue.java @@ -0,0 +1,83 @@ +package run.halo.app.extension.controller; + +import static run.halo.app.extension.controller.Reconciler.Request; + +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import reactor.core.Disposable; + +public interface RequestQueue extends Disposable { + + boolean addImmediately(Request request); + + boolean add(DelayedEntry entry); + + DelayedEntry take() throws InterruptedException; + + void done(Request request); + + class DelayedEntry implements Delayed { + + private final E entry; + + private final Instant readyAt; + + private final Supplier nowSupplier; + + private final Duration retryAfter; + + DelayedEntry(E entry, Duration retryAfter, Supplier nowSupplier) { + this.entry = entry; + this.readyAt = nowSupplier.get().plusMillis(retryAfter.toMillis()); + this.nowSupplier = nowSupplier; + this.retryAfter = retryAfter; + } + + public DelayedEntry(E entry, Instant readyAt, Supplier nowSupplier) { + this.entry = entry; + this.readyAt = readyAt; + this.nowSupplier = nowSupplier; + this.retryAfter = Duration.between(nowSupplier.get(), readyAt); + } + + @Override + public long getDelay(TimeUnit unit) { + Duration diff = Duration.between(nowSupplier.get(), readyAt); + return unit.convert(diff); + } + + public Duration getRetryAfter() { + return retryAfter; + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + public E getEntry() { + return entry; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DelayedEntry that = (DelayedEntry) o; + return Objects.equals(entry, that.entry); + } + + @Override + public int hashCode() { + return Objects.hash(entry); + } + } +} diff --git a/src/main/java/run/halo/app/extension/controller/RequestSynchronizer.java b/src/main/java/run/halo/app/extension/controller/RequestSynchronizer.java new file mode 100644 index 000000000..054bbdcc2 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/RequestSynchronizer.java @@ -0,0 +1,71 @@ +package run.halo.app.extension.controller; + +import java.util.function.Predicate; +import lombok.extern.slf4j.Slf4j; +import reactor.core.Disposable; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionClient; +import run.halo.app.extension.Watcher; + +@Slf4j +public class RequestSynchronizer implements Disposable { + + private final ExtensionClient client; + + private final Class type; + + private final boolean syncAllOnStart; + + private volatile boolean disposed = false; + + private volatile boolean started = false; + + private final Watcher watcher; + + private final Predicate listPredicate; + + public RequestSynchronizer(boolean syncAllOnStart, + ExtensionClient client, + Extension extension, + Watcher watcher, + Predicate listPredicate) { + this.syncAllOnStart = syncAllOnStart; + this.client = client; + this.type = extension.getClass(); + this.watcher = watcher; + if (listPredicate == null) { + listPredicate = e -> true; + } + this.listPredicate = listPredicate; + } + + public void start() { + if (isDisposed() || started) { + return; + } + log.info("Starting request({}) synchronizer...", type); + started = true; + + if (syncAllOnStart) { + client.list(type, listPredicate::test, null) + .forEach(watcher::onAdd); + } + client.watch(this.watcher); + log.info("Started request({}) synchronizer.", type); + } + + public boolean isStarted() { + return started; + } + + @Override + public void dispose() { + disposed = true; + watcher.dispose(); + } + + @Override + public boolean isDisposed() { + return this.disposed; + } +} diff --git a/src/main/java/run/halo/app/extension/store/ExtensionStoreClientJPAImpl.java b/src/main/java/run/halo/app/extension/store/ExtensionStoreClientJPAImpl.java index 83b77ef06..63e1c511c 100644 --- a/src/main/java/run/halo/app/extension/store/ExtensionStoreClientJPAImpl.java +++ b/src/main/java/run/halo/app/extension/store/ExtensionStoreClientJPAImpl.java @@ -4,6 +4,7 @@ import jakarta.persistence.EntityNotFoundException; import java.util.List; import java.util.Optional; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; /** * An implementation of ExtensionStoreClient using JPA. @@ -42,6 +43,7 @@ public class ExtensionStoreClientJPAImpl implements ExtensionStoreClient { } @Override + @Transactional public ExtensionStore delete(String name, Long version) { var extensionStore = repository.findById(name).orElseThrow(EntityNotFoundException::new); diff --git a/src/main/java/run/halo/app/security/SuperAdminInitializer.java b/src/main/java/run/halo/app/security/SuperAdminInitializer.java index 3bf790c1d..c78d2fe42 100644 --- a/src/main/java/run/halo/app/security/SuperAdminInitializer.java +++ b/src/main/java/run/halo/app/security/SuperAdminInitializer.java @@ -4,7 +4,7 @@ import java.time.Instant; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; -import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.security.crypto.password.PasswordEncoder; import org.springframework.stereotype.Component; @@ -20,7 +20,7 @@ import run.halo.app.extension.Metadata; @Slf4j @Component -public class SuperAdminInitializer implements ApplicationListener { +public class SuperAdminInitializer implements ApplicationListener { private final ExtensionClient client; private final PasswordEncoder passwordEncoder; @@ -31,7 +31,7 @@ public class SuperAdminInitializer implements ApplicationListener { // do nothing if admin has been initialized }, () -> { diff --git a/src/main/resources/application-dev.yaml b/src/main/resources/application-dev.yaml index b61b31c29..b0b545046 100644 --- a/src/main/resources/application-dev.yaml +++ b/src/main/resources/application-dev.yaml @@ -53,7 +53,7 @@ springdoc: enabled: true show-login-endpoint: true show-actuator: true - use-management-port: true + use-management-port: false management: endpoints: diff --git a/src/test/java/run/halo/app/PathPrefixPredicateTest.java b/src/test/java/run/halo/app/PathPrefixPredicateTest.java index ab7588b1b..1447b4dc5 100644 --- a/src/test/java/run/halo/app/PathPrefixPredicateTest.java +++ b/src/test/java/run/halo/app/PathPrefixPredicateTest.java @@ -33,4 +33,5 @@ public class PathPrefixPredicateTest { class TestController { } + } diff --git a/src/test/java/run/halo/app/extension/DefaultExtensionClientTest.java b/src/test/java/run/halo/app/extension/DefaultExtensionClientTest.java index 975bd44cc..9d5e1cbe5 100644 --- a/src/test/java/run/halo/app/extension/DefaultExtensionClientTest.java +++ b/src/test/java/run/halo/app/extension/DefaultExtensionClientTest.java @@ -10,6 +10,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -21,6 +22,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -98,11 +101,6 @@ class DefaultExtensionClientTest { return Unstructured.OBJECT_MAPPER.readValue(extensionJson, Unstructured.class); } - @Test - void shouldFetchUnstructuredSuccessfully() { - - } - @Test void shouldThrowSchemeNotFoundExceptionWhenSchemeNotRegistered() { class UnRegisteredExtension extends AbstractExtension { @@ -349,4 +347,41 @@ class DefaultExtensionClientTest { verify(storeClient, times(1)).delete(any(), any()); } + @Nested + @DisplayName("Extension watcher test") + class WatcherTest { + + @Mock + Watcher watcher; + + @BeforeEach + void setUp() { + client.watch(watcher); + } + + @Test + void shouldWatchOnAddSuccessfully() { + doNothing().when(watcher).onAdd(any()); + shouldCreateSuccessfully(); + + verify(watcher, times(1)).onAdd(any()); + } + + @Test + void shouldWatchOnUpdateSuccessfully() { + doNothing().when(watcher).onUpdate(any(), any()); + shouldUpdateSuccessfully(); + + verify(watcher, times(1)).onUpdate(any(), any()); + } + + @Test + void shouldWatchOnDeleteSuccessfully() { + doNothing().when(watcher).onDelete(any()); + shouldDeleteSuccessfully(); + + verify(watcher, times(1)).onDelete(any()); + } + } + } \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/FakeExtension.java b/src/test/java/run/halo/app/extension/FakeExtension.java index 81ab39cf5..d0e5cb03b 100644 --- a/src/test/java/run/halo/app/extension/FakeExtension.java +++ b/src/test/java/run/halo/app/extension/FakeExtension.java @@ -6,4 +6,13 @@ package run.halo.app.extension; plural = "fakes", singular = "fake") public class FakeExtension extends AbstractExtension { + + public static FakeExtension createFake(String name) { + var metadata = new Metadata(); + metadata.setName(name); + var fake = new FakeExtension(); + fake.setMetadata(metadata); + return fake; + } + } diff --git a/src/test/java/run/halo/app/extension/controller/ControllerBuilderTest.java b/src/test/java/run/halo/app/extension/controller/ControllerBuilderTest.java new file mode 100644 index 000000000..ff86c14e8 --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/ControllerBuilderTest.java @@ -0,0 +1,100 @@ +package run.halo.app.extension.controller; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import run.halo.app.extension.ExtensionClient; +import run.halo.app.extension.FakeExtension; + +@ExtendWith(MockitoExtension.class) +class ControllerBuilderTest { + + @Mock + ExtensionClient client; + + @Test + void buildWithBlankName() { + assertThrows(IllegalArgumentException.class, + () -> new ControllerBuilder("", client).build()); + } + + @Test + void buildWithNullClient() { + assertThrows(IllegalArgumentException.class, + () -> new ControllerBuilder("fake-name", null).build()); + } + + @Test + void buildTest() { + assertThrows(IllegalArgumentException.class, + () -> new ControllerBuilder("fake-name", client) + .build(), + "Extension must not be null"); + assertThrows(IllegalArgumentException.class, + () -> new ControllerBuilder("fake-name", client) + .extension(new FakeExtension()) + .build(), + "Reconciler must not be null"); + + assertNotNull(fakeBuilder().build()); + + assertNotNull(fakeBuilder() + .syncAllOnStart(true) + .nowSupplier(Instant::now) + .minDelay(Duration.ofMillis(5)) + .maxDelay(Duration.ofSeconds(1000)) + .build()); + + assertNotNull(fakeBuilder() + .syncAllOnStart(true) + .minDelay(Duration.ofMillis(5)) + .maxDelay(Duration.ofSeconds(1000)) + .onAddPredicate(Objects::nonNull) + .onUpdatePredicate(Objects::equals) + .onDeletePredicate(Objects::nonNull) + .build() + ); + } + + @Test + void invalidMinDelayAndMaxDelay() { + assertThrows(IllegalArgumentException.class, + () -> fakeBuilder() + .minDelay(Duration.ofSeconds(2)) + .maxDelay(Duration.ofSeconds(1)) + .build(), + "Min delay must be less than or equal to max delay"); + + assertNotNull(fakeBuilder() + .minDelay(null) + .maxDelay(Duration.ofSeconds(1)) + .build()); + + assertNotNull(fakeBuilder() + .minDelay(Duration.ofSeconds(1)) + .maxDelay(null) + .build()); + + assertNotNull(fakeBuilder() + .minDelay(Duration.ofSeconds(-1)) + .build()); + + assertNotNull(fakeBuilder() + .maxDelay(Duration.ofSeconds(-1)) + .build()); + } + + ControllerBuilder fakeBuilder() { + return new ControllerBuilder("fake-name", client) + .extension(new FakeExtension()) + .reconciler(request -> new Reconciler.Result(false, null)); + } + +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java b/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java new file mode 100644 index 000000000..5d8e40d7f --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/DefaultControllerTest.java @@ -0,0 +1,224 @@ +package run.halo.app.extension.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import run.halo.app.extension.controller.Reconciler.Request; +import run.halo.app.extension.controller.Reconciler.Result; +import run.halo.app.extension.controller.RequestQueue.DelayedEntry; + +@ExtendWith(MockitoExtension.class) +class DefaultControllerTest { + + @Mock + RequestQueue queue; + + @Mock + Reconciler reconciler; + + @Mock + RequestSynchronizer synchronizer; + + @Mock + ExecutorService executor; + + Instant now = Instant.now(); + + Duration minRetryAfter = Duration.ofMillis(100); + + Duration maxRetryAfter = Duration.ofSeconds(10); + + DefaultController controller; + + @BeforeEach + void setUp() { + controller = new DefaultController("fake-controller", reconciler, queue, synchronizer, + () -> now, minRetryAfter, maxRetryAfter, executor); + + assertFalse(controller.isDisposed()); + assertFalse(controller.isStarted()); + } + + @Test + void shouldReturnRightName() { + assertEquals("fake-controller", controller.getName()); + } + + @Test + void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null)); + + controller.run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(0)).add(any()); + verify(queue, times(1)).done(any()); + verify(reconciler, times(1)).reconcile(eq(new Request("fake-request"))); + } + + @Test + void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + + controller.run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(Duration.ofSeconds(2)))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } + + @Test + void shouldReRunIfReconcilerThrowException() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), Duration.ofSeconds(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class); + + controller.run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(Duration.ofSeconds(2)))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } + + @Test + void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + + controller.run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(minRetryAfter))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } + + @Test + void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry() + throws InterruptedException { + when(queue.take()).thenReturn(new DelayedEntry<>( + new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now + )) + .thenThrow(InterruptedException.class); + when(queue.add(any())).thenReturn(true); + when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null)); + + controller.run(); + + verify(synchronizer, times(1)).start(); + verify(queue, times(2)).take(); + verify(queue, times(1)).done(any()); + verify(queue, times(1)).add(argThat(de -> + de.getEntry().name().equals("fake-request") + && de.getRetryAfter().equals(maxRetryAfter))); + verify(reconciler, times(1)).reconcile(any(Request.class)); + } + + @Test + void shouldDisposeCorrectly() throws InterruptedException { + when(executor.awaitTermination(anyLong(), any())).thenReturn(true); + + controller.dispose(); + + assertTrue(controller.isDisposed()); + assertFalse(controller.isStarted()); + + verify(synchronizer, times(1)).dispose(); + verify(queue, times(1)).dispose(); + verify(executor, times(1)).shutdownNow(); + verify(executor, times(1)).awaitTermination(anyLong(), any()); + } + + @Test + void shouldDisposeCorrectlyEvenIfTimeoutAwaitTermination() throws InterruptedException { + when(executor.awaitTermination(anyLong(), any())).thenThrow(InterruptedException.class); + + controller.dispose(); + + assertTrue(controller.isDisposed()); + assertFalse(controller.isStarted()); + + verify(synchronizer, times(1)).dispose(); + verify(queue, times(1)).dispose(); + verify(executor, times(1)).shutdownNow(); + verify(executor, times(1)).awaitTermination(anyLong(), any()); + } + + @Test + void shouldStartCorrectly() throws InterruptedException { + when(executor.submit(any(Runnable.class))).thenAnswer(invocation -> { + doNothing().when(synchronizer).start(); + when(queue.take()).thenThrow(InterruptedException.class); + + // invoke the task really + ((Runnable) invocation.getArgument(0)).run(); + return mock(Future.class); + }); + controller.start(); + + assertTrue(controller.isStarted()); + assertFalse(controller.isDisposed()); + + verify(executor, times(1)).submit(any(Runnable.class)); + verify(synchronizer, times(1)).start(); + verify(queue, times(1)).take(); + verify(reconciler, times(0)).reconcile(any()); + } + + @Test + void shouldNotStartWhenDisposed() { + controller.dispose(); + controller.start(); + assertFalse(controller.isStarted()); + assertTrue(controller.isDisposed()); + + verify(executor, times(0)).submit(any(Runnable.class)); + } +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java b/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java new file mode 100644 index 000000000..1d0fb6693 --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java @@ -0,0 +1,108 @@ +package run.halo.app.extension.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import run.halo.app.extension.controller.Reconciler.Request; +import run.halo.app.extension.controller.RequestQueue.DelayedEntry; + +class DefaultDelayQueueTest { + + Instant now = Instant.now(); + + DefaultDelayQueue queue; + + final Duration minDelay = Duration.ofMillis(1); + + @BeforeEach + void setUp() { + queue = new DefaultDelayQueue(() -> now, minDelay); + } + + @Test + void addImmediatelyTest() { + var request = newRequest("fake-name"); + var added = queue.addImmediately(request); + assertTrue(added); + assertEquals(1, queue.size()); + var delayedEntry = queue.peek(); + assertNotNull(delayedEntry); + assertEquals(newRequest("fake-name"), delayedEntry.getEntry()); + assertEquals(minDelay, delayedEntry.getRetryAfter()); + assertEquals(minDelay.toMillis(), delayedEntry.getDelay(TimeUnit.MILLISECONDS)); + } + + @Test + void addWithDelaySmallerThanMinDelay() { + var request = newRequest("fake-name"); + var added = queue.add(new DelayedEntry<>(request, Duration.ofNanos(1), () -> now)); + assertTrue(added); + assertEquals(1, queue.size()); + var delayedEntry = queue.peek(); + assertNotNull(delayedEntry); + assertEquals(newRequest("fake-name"), delayedEntry.getEntry()); + assertEquals(minDelay, delayedEntry.getRetryAfter()); + assertEquals(minDelay.toMillis(), delayedEntry.getDelay(TimeUnit.MILLISECONDS)); + } + + @Test + void addWithDelayGreaterThanMinDelay() { + var request = newRequest("fake-name"); + var added = queue.add(new DelayedEntry<>(request, minDelay.plusMillis(1), () -> now)); + assertTrue(added); + assertEquals(1, queue.size()); + var delayedEntry = queue.peek(); + assertNotNull(delayedEntry); + assertEquals(newRequest("fake-name"), delayedEntry.getEntry()); + assertEquals(minDelay.plusMillis(1), delayedEntry.getRetryAfter()); + assertEquals(minDelay.plusMillis(1).toMillis(), + delayedEntry.getDelay(TimeUnit.MILLISECONDS)); + } + + @Test + void shouldNotAddAfterDisposing() { + assertFalse(queue.isDisposed()); + queue.dispose(); + assertTrue(queue.isDisposed()); + var request = newRequest("fake-name"); + var added = queue.add(new DelayedEntry<>(request, minDelay, () -> now)); + assertFalse(added); + assertEquals(0, queue.size()); + } + + @Test + void shouldNotAddRepeatedlyIfNotDone() throws InterruptedException { + var entrySpy = spy(new DelayedEntry<>(newRequest("fake-name"), minDelay, () -> now)); + + doReturn(0L).when(entrySpy).getDelay(any()); + + queue.add(entrySpy); + assertEquals(1, queue.size()); + assertEquals(entrySpy, queue.peek()); + queue.take(); + assertEquals(0, queue.size()); + + queue.add(entrySpy); + assertEquals(0, queue.size()); + + queue.done(newRequest("fake-name")); + queue.add(entrySpy); + assertEquals(1, queue.size()); + assertEquals(entrySpy, queue.peek()); + } + + Request newRequest(String name) { + return new Request(name); + } + +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/controller/DelayedEntryTest.java b/src/test/java/run/halo/app/extension/controller/DelayedEntryTest.java new file mode 100644 index 000000000..c0d619687 --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/DelayedEntryTest.java @@ -0,0 +1,59 @@ +package run.halo.app.extension.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import run.halo.app.extension.controller.RequestQueue.DelayedEntry; + +class DelayedEntryTest { + + Instant now = Instant.now(); + + @Test + void createDelayedEntry() { + var delayedEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now); + assertEquals(100, delayedEntry.getDelay(TimeUnit.MILLISECONDS)); + assertEquals(Duration.ofMillis(100), delayedEntry.getRetryAfter()); + assertEquals("fake", delayedEntry.getEntry()); + + delayedEntry = new DelayedEntry<>("fake", now.plus(Duration.ofSeconds(1)), () -> now); + assertEquals(1000, delayedEntry.getDelay(TimeUnit.MILLISECONDS)); + assertEquals(Duration.ofMillis(1000), delayedEntry.getRetryAfter()); + } + + @Test + void compareWithGreaterDelay() { + var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now); + var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now); + + assertTrue(firstDelayEntry.compareTo(secondDelayEntry) < 0); + } + + @Test + void compareWithSameDelay() { + var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now); + var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now); + + assertEquals(0, firstDelayEntry.compareTo(secondDelayEntry)); + } + + @Test + void compareWithLessDelay() { + var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now); + var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now); + + assertTrue(firstDelayEntry.compareTo(secondDelayEntry) > 0); + } + + @Test + void shouldBeEqualWithNameOnly() { + var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now); + var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), Instant::now); + + assertEquals(firstDelayEntry, secondDelayEntry); + } +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/controller/ExtensionWatcherTest.java b/src/test/java/run/halo/app/extension/controller/ExtensionWatcherTest.java new file mode 100644 index 000000000..d1085b04c --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/ExtensionWatcherTest.java @@ -0,0 +1,132 @@ +package run.halo.app.extension.controller; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static run.halo.app.extension.FakeExtension.createFake; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import run.halo.app.extension.WatcherPredicates; + +@ExtendWith(MockitoExtension.class) +class ExtensionWatcherTest { + + @Mock + RequestQueue queue; + + @Mock + WatcherPredicates predicates; + + @InjectMocks + ExtensionWatcher watcher; + + @Test + void shouldAddExtensionWhenAddPredicateAlwaysTrue() { + when(predicates.onAddPredicate()).thenReturn(e -> true); + watcher.onAdd(createFake("fake-name")); + + verify(predicates, times(1)).onAddPredicate(); + verify(queue, times(1)).addImmediately( + argThat(request -> request.name().equals("fake-name"))); + verify(queue, times(0)).add(any()); + } + + @Test + void shouldNotAddExtensionWhenAddPredicateAlwaysFalse() { + when(predicates.onAddPredicate()).thenReturn(e -> false); + watcher.onAdd(createFake("fake-name")); + + verify(predicates, times(1)).onAddPredicate(); + verify(queue, times(0)).add(any()); + verify(queue, times(0)).addImmediately(any()); + } + + @Test + void shouldNotAddExtensionWhenWatcherIsDisposed() { + watcher.dispose(); + watcher.onAdd(createFake("fake-name")); + + verify(predicates, times(0)).onAddPredicate(); + verify(queue, times(0)).addImmediately(any()); + verify(queue, times(0)).add(any()); + } + + @Test + void shouldUpdateExtensionWhenUpdatePredicateAlwaysTrue() { + when(predicates.onUpdatePredicate()).thenReturn((e1, e2) -> true); + watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name")); + + verify(predicates, times(1)).onUpdatePredicate(); + verify(queue, times(1)).addImmediately( + argThat(request -> request.name().equals("new-fake-name"))); + verify(queue, times(0)).add(any()); + } + + @Test + void shouldUpdateExtensionWhenUpdatePredicateAlwaysFalse() { + when(predicates.onUpdatePredicate()).thenReturn((e1, e2) -> false); + watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name")); + + verify(predicates, times(1)).onUpdatePredicate(); + verify(queue, times(0)).add(any()); + verify(queue, times(0)).addImmediately(any()); + } + + @Test + void shouldNotUpdateExtensionWhenWatcherIsDisposed() { + watcher.dispose(); + watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name")); + + verify(predicates, times(0)).onUpdatePredicate(); + verify(queue, times(0)).add(any()); + verify(queue, times(0)).addImmediately(any()); + } + + @Test + void shouldDeleteExtensionWhenDeletePredicateAlwaysTrue() { + when(predicates.onDeletePredicate()).thenReturn(e -> true); + watcher.onDelete(createFake("fake-name")); + + verify(predicates, times(1)).onDeletePredicate(); + verify(queue, times(1)).addImmediately( + argThat(request -> request.name().equals("fake-name"))); + verify(queue, times(0)).add(any()); + } + + @Test + void shouldDeleteExtensionWhenDeletePredicateAlwaysFalse() { + when(predicates.onDeletePredicate()).thenReturn(e -> false); + watcher.onDelete(createFake("fake-name")); + + verify(predicates, times(1)).onDeletePredicate(); + verify(queue, times(0)).add(any()); + verify(queue, times(0)).addImmediately(any()); + } + + @Test + void shouldNotDeleteExtensionWhenWatcherIsDisposed() { + watcher.dispose(); + watcher.onDelete(createFake("fake-name")); + + verify(predicates, times(0)).onDeletePredicate(); + verify(queue, times(0)).add(any()); + verify(queue, times(0)).addImmediately(any()); + } + + @Test + void shouldInvokeDisposeHookIfRegistered() { + var mockHook = mock(Runnable.class); + watcher.registerDisposeHook(mockHook); + verify(mockHook, times(0)).run(); + + watcher.dispose(); + verify(mockHook, times(1)).run(); + } +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/extension/controller/RequestSynchronizerTest.java b/src/test/java/run/halo/app/extension/controller/RequestSynchronizerTest.java new file mode 100644 index 000000000..c427e91a4 --- /dev/null +++ b/src/test/java/run/halo/app/extension/controller/RequestSynchronizerTest.java @@ -0,0 +1,101 @@ +package run.halo.app.extension.controller; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static run.halo.app.extension.FakeExtension.createFake; + +import java.util.List; +import java.util.function.Predicate; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionClient; +import run.halo.app.extension.FakeExtension; +import run.halo.app.extension.Watcher; + +@ExtendWith(MockitoExtension.class) +class RequestSynchronizerTest { + + @Mock + ExtensionClient client; + + @Mock + Watcher watcher; + + @Mock + Predicate listPredicate; + + RequestSynchronizer synchronizer; + + @BeforeEach + void setUp() { + synchronizer = + new RequestSynchronizer(true, client, new FakeExtension(), watcher, listPredicate); + assertFalse(synchronizer.isDisposed()); + assertFalse(synchronizer.isStarted()); + } + + @Test + void shouldStartCorrectlyWhenSyncingAllOnStart() { + when(client.list(same(FakeExtension.class), any(), any())).thenReturn( + List.of(createFake("fake-01"), createFake("fake-02"))); + + synchronizer.start(); + + assertTrue(synchronizer.isStarted()); + assertFalse(synchronizer.isDisposed()); + + verify(client, times(1)).list(same(FakeExtension.class), any(), any()); + verify(watcher, times(2)).onAdd(any()); + verify(client, times(1)).watch(same(watcher)); + } + + @Test + void shouldStartCorrectlyWhenNotSyncingAllOnStart() { + synchronizer = + new RequestSynchronizer(false, client, new FakeExtension(), watcher, listPredicate); + assertFalse(synchronizer.isDisposed()); + assertFalse(synchronizer.isStarted()); + + synchronizer.start(); + + assertTrue(synchronizer.isStarted()); + assertFalse(synchronizer.isDisposed()); + + verify(client, times(0)).list(any(), any(), any()); + verify(watcher, times(0)).onAdd(any()); + verify(client, times(1)).watch(any(Watcher.class)); + } + + @Test + void shouldDisposeCorrectly() { + synchronizer.start(); + assertFalse(synchronizer.isDisposed()); + assertTrue(synchronizer.isStarted()); + + synchronizer.dispose(); + + assertTrue(synchronizer.isDisposed()); + assertTrue(synchronizer.isStarted()); + verify(watcher, times(1)).dispose(); + } + + @Test + void shouldNotStartAfterDisposing() { + synchronizer.dispose(); + synchronizer.start(); + + verify(client, times(0)).list(any(), any(), any()); + verify(watcher, times(0)).onAdd(any()); + verify(client, times(0)).watch(any()); + } + +} \ No newline at end of file diff --git a/src/test/java/run/halo/app/security/SuperAdminInitializerTest.java b/src/test/java/run/halo/app/security/SuperAdminInitializerTest.java index e09f528ab..d5ba66a46 100644 --- a/src/test/java/run/halo/app/security/SuperAdminInitializerTest.java +++ b/src/test/java/run/halo/app/security/SuperAdminInitializerTest.java @@ -1,7 +1,6 @@ package run.halo.app.security; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -31,7 +30,6 @@ class SuperAdminInitializerTest { @Test void checkSuperAdminInitialization() { - verify(client, times(1)).fetch(eq(User.class), eq("admin")); verify(client, times(1)).create(argThat(extension -> { if (extension instanceof User user) { return "admin".equals(user.getMetadata().getName());