mirror of https://github.com/halo-dev/halo
Implement Extension reconciliation mechanism (#2204)
parent
d7cfe4c4a5
commit
27435a1aeb
|
@ -5,6 +5,8 @@ import org.springframework.context.annotation.Bean;
|
|||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.reactive.function.server.RouterFunction;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import run.halo.app.core.extension.User;
|
||||
import run.halo.app.core.extension.reconciler.UserReconciler;
|
||||
import run.halo.app.extension.DefaultExtensionClient;
|
||||
import run.halo.app.extension.DefaultSchemeManager;
|
||||
import run.halo.app.extension.DefaultSchemeWatcherManager;
|
||||
|
@ -14,6 +16,8 @@ import run.halo.app.extension.JSONExtensionConverter;
|
|||
import run.halo.app.extension.SchemeManager;
|
||||
import run.halo.app.extension.SchemeWatcherManager;
|
||||
import run.halo.app.extension.SchemeWatcherManager.SchemeWatcher;
|
||||
import run.halo.app.extension.controller.Controller;
|
||||
import run.halo.app.extension.controller.ControllerBuilder;
|
||||
import run.halo.app.extension.store.ExtensionStoreClient;
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
|
@ -40,4 +44,12 @@ public class ExtensionConfiguration {
|
|||
SchemeWatcherManager schemeWatcherManager() {
|
||||
return new DefaultSchemeWatcherManager();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Controller userController(ExtensionClient client) {
|
||||
return new ControllerBuilder("user-controller", client)
|
||||
.reconciler(new UserReconciler(client))
|
||||
.extension(new User())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package run.halo.app.core.extension.reconciler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import run.halo.app.extension.ExtensionClient;
|
||||
import run.halo.app.extension.controller.Reconciler;
|
||||
|
||||
@Slf4j
|
||||
public class UserReconciler implements Reconciler {
|
||||
|
||||
private final ExtensionClient client;
|
||||
|
||||
public UserReconciler(ExtensionClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result reconcile(Request request) {
|
||||
//TODO Add reconciliation logic here for User extension.
|
||||
return new Result(false, null);
|
||||
}
|
||||
|
||||
}
|
|
@ -9,7 +9,6 @@ import org.springframework.data.domain.Page;
|
|||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.support.PageableExecutionUtils;
|
||||
import org.springframework.util.Assert;
|
||||
import run.halo.app.extension.store.ExtensionStore;
|
||||
import run.halo.app.extension.store.ExtensionStoreClient;
|
||||
|
||||
/**
|
||||
|
@ -24,12 +23,16 @@ public class DefaultExtensionClient implements ExtensionClient {
|
|||
|
||||
private final SchemeManager schemeManager;
|
||||
|
||||
private final Watcher.WatcherComposite watchers;
|
||||
|
||||
public DefaultExtensionClient(ExtensionStoreClient storeClient,
|
||||
ExtensionConverter converter,
|
||||
SchemeManager schemeManager) {
|
||||
this.storeClient = storeClient;
|
||||
this.converter = converter;
|
||||
this.schemeManager = schemeManager;
|
||||
|
||||
watchers = new Watcher.WatcherComposite();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +85,9 @@ public class DefaultExtensionClient implements ExtensionClient {
|
|||
metadata.setCreationTimestamp(Instant.now());
|
||||
// extension.setMetadata(metadata);
|
||||
var extensionStore = converter.convertTo(extension);
|
||||
storeClient.create(extensionStore.getName(), extensionStore.getData());
|
||||
var createdStore = storeClient.create(extensionStore.getName(), extensionStore.getData());
|
||||
var createdExt = converter.convertFrom(extension.getClass(), createdStore);
|
||||
watchers.onAdd(createdExt);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,14 +95,23 @@ public class DefaultExtensionClient implements ExtensionClient {
|
|||
var extensionStore = converter.convertTo(extension);
|
||||
Assert.notNull(extension.getMetadata().getVersion(),
|
||||
"Extension version must not be null when updating");
|
||||
storeClient.update(extensionStore.getName(), extensionStore.getVersion(),
|
||||
var updatedStore = storeClient.update(extensionStore.getName(), extensionStore.getVersion(),
|
||||
extensionStore.getData());
|
||||
var updatedExt = converter.convertFrom(extension.getClass(), updatedStore);
|
||||
watchers.onUpdate(extension, updatedExt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends Extension> void delete(E extension) {
|
||||
ExtensionStore extensionStore = converter.convertTo(extension);
|
||||
storeClient.delete(extensionStore.getName(), extensionStore.getVersion());
|
||||
var extensionStore = converter.convertTo(extension);
|
||||
var deleteStore = storeClient.delete(extensionStore.getName(), extensionStore.getVersion());
|
||||
Extension deleteExt = converter.convertFrom(extension.getClass(), deleteStore);
|
||||
watchers.onDelete(deleteExt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void watch(Watcher watcher) {
|
||||
this.watchers.addWatcher(watcher);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ public interface ExtensionClient {
|
|||
* Lists Extensions by Extension type, filter and sorter.
|
||||
*
|
||||
* @param type is the class type of Extension.
|
||||
* @param predicate filters the result.
|
||||
* @param comparator sorts the result.
|
||||
* @param predicate filters the reEnqueue.
|
||||
* @param comparator sorts the reEnqueue.
|
||||
* @param <E> is Extension type.
|
||||
* @return all filtered and sorted Extensions.
|
||||
*/
|
||||
|
@ -30,8 +30,8 @@ public interface ExtensionClient {
|
|||
* Lists Extensions by Extension type, filter, sorter and page info.
|
||||
*
|
||||
* @param type is the class type of Extension.
|
||||
* @param predicate filters the result.
|
||||
* @param comparator sorts the result.
|
||||
* @param predicate filters the reEnqueue.
|
||||
* @param comparator sorts the reEnqueue.
|
||||
* @param page is page number which starts from 0.
|
||||
* @param size is page size.
|
||||
* @param <E> is Extension type.
|
||||
|
@ -80,4 +80,6 @@ public interface ExtensionClient {
|
|||
*/
|
||||
<E extends Extension> void delete(E extension);
|
||||
|
||||
void watch(Watcher watcher);
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
package run.halo.app.extension;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import reactor.core.Disposable;
|
||||
|
||||
public interface Watcher extends Disposable {
|
||||
|
||||
default void onAdd(Extension extension) {
|
||||
// Do nothing here
|
||||
}
|
||||
|
||||
default void onUpdate(Extension oldExtension, Extension newExtension) {
|
||||
// Do nothing here
|
||||
}
|
||||
|
||||
default void onDelete(Extension extension) {
|
||||
// Do nothing here
|
||||
}
|
||||
|
||||
default void registerDisposeHook(Runnable dispose) {
|
||||
}
|
||||
|
||||
class WatcherComposite implements Watcher {
|
||||
|
||||
private final List<Watcher> watchers;
|
||||
|
||||
private volatile boolean disposed = false;
|
||||
|
||||
private Runnable disposeHook;
|
||||
|
||||
public WatcherComposite() {
|
||||
watchers = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAdd(Extension extension) {
|
||||
// TODO Deep copy extension and execute onAdd asynchronously
|
||||
watchers.forEach(watcher -> watcher.onAdd(extension));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(Extension oldExtension, Extension newExtension) {
|
||||
// TODO Deep copy extension and execute onUpdate asynchronously
|
||||
watchers.forEach(watcher -> watcher.onUpdate(oldExtension, newExtension));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(Extension extension) {
|
||||
// TODO Deep copy extension and execute onDelete asynchronously
|
||||
watchers.forEach(watcher -> watcher.onDelete(extension));
|
||||
}
|
||||
|
||||
public void addWatcher(Watcher watcher) {
|
||||
if (!watcher.isDisposed() && !watchers.contains(watcher)) {
|
||||
watchers.add(watcher);
|
||||
watcher.registerDisposeHook(() -> removeWatcher(watcher));
|
||||
}
|
||||
}
|
||||
|
||||
public void removeWatcher(Watcher watcher) {
|
||||
watchers.remove(watcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerDisposeHook(Runnable dispose) {
|
||||
this.disposeHook = dispose;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
this.disposed = true;
|
||||
this.watchers.clear();
|
||||
if (this.disposeHook != null) {
|
||||
this.disposeHook.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return this.disposed;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
package run.halo.app.extension;
|
||||
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class WatcherPredicates {
|
||||
|
||||
static final Predicate<Extension> EMPTY_PREDICATE = (e) -> true;
|
||||
|
||||
static final BiPredicate<Extension, Extension> EMPTY_BI_PREDICATE = (oldExt, newExt) -> true;
|
||||
private final Predicate<Extension> onAddPredicate;
|
||||
private final BiPredicate<Extension, Extension> onUpdatePredicate;
|
||||
private final Predicate<Extension> onDeletePredicate;
|
||||
|
||||
public WatcherPredicates(Predicate<Extension> onAddPredicate,
|
||||
BiPredicate<Extension, Extension> onUpdatePredicate,
|
||||
Predicate<Extension> onDeletePredicate) {
|
||||
this.onAddPredicate = onAddPredicate;
|
||||
this.onUpdatePredicate = onUpdatePredicate;
|
||||
this.onDeletePredicate = onDeletePredicate;
|
||||
}
|
||||
|
||||
public Predicate<Extension> onAddPredicate() {
|
||||
if (onAddPredicate == null) {
|
||||
return EMPTY_PREDICATE;
|
||||
}
|
||||
return onAddPredicate;
|
||||
}
|
||||
|
||||
public BiPredicate<Extension, Extension> onUpdatePredicate() {
|
||||
if (onUpdatePredicate == null) {
|
||||
return EMPTY_BI_PREDICATE;
|
||||
}
|
||||
return onUpdatePredicate;
|
||||
}
|
||||
|
||||
public Predicate<Extension> onDeletePredicate() {
|
||||
if (onDeletePredicate == null) {
|
||||
return EMPTY_PREDICATE;
|
||||
}
|
||||
return onDeletePredicate;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
|
||||
private Predicate<Extension> onAddPredicate;
|
||||
private BiPredicate<Extension, Extension> onUpdatePredicate;
|
||||
private Predicate<Extension> onDeletePredicate;
|
||||
|
||||
private GroupVersionKind gvk;
|
||||
|
||||
public Builder withGroupVersionKind(GroupVersionKind gvk) {
|
||||
this.gvk = gvk;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder onAddPredicate(Predicate<Extension> onAddPredicate) {
|
||||
this.onAddPredicate = onAddPredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder onUpdatePredicate(
|
||||
BiPredicate<Extension, Extension> onUpdatePredicate) {
|
||||
this.onUpdatePredicate = onUpdatePredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder onDeletePredicate(Predicate<Extension> onDeletePredicate) {
|
||||
this.onDeletePredicate = onDeletePredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public WatcherPredicates build() {
|
||||
Predicate<Extension> gvkPredicate = EMPTY_PREDICATE;
|
||||
BiPredicate<Extension, Extension> gvkBiPredicate = EMPTY_BI_PREDICATE;
|
||||
if (gvk != null) {
|
||||
gvkPredicate = e -> gvk.equals(e.groupVersionKind());
|
||||
gvkBiPredicate = (oldE, newE) -> oldE.groupVersionKind().equals(gvk)
|
||||
&& newE.groupVersionKind().equals(gvk);
|
||||
}
|
||||
if (onAddPredicate == null) {
|
||||
onAddPredicate = EMPTY_PREDICATE;
|
||||
}
|
||||
if (onUpdatePredicate == null) {
|
||||
onUpdatePredicate = EMPTY_BI_PREDICATE;
|
||||
}
|
||||
if (onDeletePredicate == null) {
|
||||
onDeletePredicate = EMPTY_PREDICATE;
|
||||
}
|
||||
|
||||
onAddPredicate = gvkPredicate.and(onAddPredicate);
|
||||
onUpdatePredicate = gvkBiPredicate.and(onUpdatePredicate);
|
||||
onDeletePredicate = gvkPredicate.and(onDeletePredicate);
|
||||
|
||||
return new WatcherPredicates(onAddPredicate, onUpdatePredicate, onDeletePredicate);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import reactor.core.Disposable;
|
||||
|
||||
public interface Controller extends Disposable {
|
||||
|
||||
String getName();
|
||||
|
||||
void start();
|
||||
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import org.springframework.util.Assert;
|
||||
import run.halo.app.extension.Extension;
|
||||
import run.halo.app.extension.ExtensionClient;
|
||||
import run.halo.app.extension.WatcherPredicates;
|
||||
|
||||
public class ControllerBuilder {
|
||||
|
||||
private final String name;
|
||||
|
||||
private Duration minDelay;
|
||||
|
||||
private Duration maxDelay;
|
||||
|
||||
private Reconciler reconciler;
|
||||
|
||||
private Supplier<Instant> nowSupplier;
|
||||
|
||||
private Extension extension;
|
||||
|
||||
private Predicate<Extension> onAddPredicate;
|
||||
|
||||
private Predicate<Extension> onDeletePredicate;
|
||||
|
||||
private BiPredicate<Extension, Extension> onUpdatePredicate;
|
||||
|
||||
private final ExtensionClient client;
|
||||
|
||||
private boolean syncAllOnStart = true;
|
||||
|
||||
public ControllerBuilder(String name, ExtensionClient client) {
|
||||
Assert.hasText(name, "Extension name is required");
|
||||
Assert.notNull(client, "Extension client must not be null");
|
||||
this.name = name;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public ControllerBuilder minDelay(Duration minDelay) {
|
||||
this.minDelay = minDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder maxDelay(Duration maxDelay) {
|
||||
this.maxDelay = maxDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder reconciler(Reconciler reconciler) {
|
||||
this.reconciler = reconciler;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder nowSupplier(Supplier<Instant> nowSupplier) {
|
||||
this.nowSupplier = nowSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder extension(Extension extension) {
|
||||
this.extension = extension;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder onAddPredicate(Predicate<Extension> onAddPredicate) {
|
||||
this.onAddPredicate = onAddPredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder onDeletePredicate(Predicate<Extension> onDeletePredicate) {
|
||||
this.onDeletePredicate = onDeletePredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder onUpdatePredicate(
|
||||
BiPredicate<Extension, Extension> onUpdatePredicate) {
|
||||
this.onUpdatePredicate = onUpdatePredicate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ControllerBuilder syncAllOnStart(boolean syncAllAtStart) {
|
||||
this.syncAllOnStart = syncAllAtStart;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Controller build() {
|
||||
if (nowSupplier == null) {
|
||||
nowSupplier = Instant::now;
|
||||
}
|
||||
if (minDelay == null || minDelay.isNegative() || minDelay.isZero()) {
|
||||
minDelay = Duration.ofMillis(5);
|
||||
}
|
||||
if (maxDelay == null || maxDelay.isNegative() || maxDelay.isZero()) {
|
||||
maxDelay = Duration.ofSeconds(1000);
|
||||
}
|
||||
Assert.isTrue(minDelay.compareTo(maxDelay) <= 0,
|
||||
"Min delay must be less than or equal to max delay");
|
||||
Assert.notNull(extension, "Extension must not be null");
|
||||
Assert.notNull(reconciler, "Reconciler must not be null");
|
||||
|
||||
var queue = new DefaultDelayQueue(nowSupplier, minDelay);
|
||||
var predicates = new WatcherPredicates.Builder()
|
||||
.withGroupVersionKind(extension.groupVersionKind())
|
||||
.onAddPredicate(onAddPredicate)
|
||||
.onUpdatePredicate(onUpdatePredicate)
|
||||
.onDeletePredicate(onDeletePredicate)
|
||||
.build();
|
||||
var watcher = new ExtensionWatcher(queue, predicates);
|
||||
var synchronizer = new RequestSynchronizer(syncAllOnStart,
|
||||
client,
|
||||
extension,
|
||||
watcher,
|
||||
predicates.onAddPredicate());
|
||||
return new DefaultController(name, reconciler, queue, synchronizer, minDelay, maxDelay);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ControllerManager implements ApplicationListener<ApplicationReadyEvent>,
|
||||
DisposableBean {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
public ControllerManager(ApplicationContext applicationContext) {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||
applicationContext.getBeansOfType(Controller.class).values().forEach(Controller::start);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
var controllers =
|
||||
applicationContext.getBeansOfType(Controller.class).values();
|
||||
log.info("Shutting down {} controllers...", controllers.size());
|
||||
controllers.forEach(
|
||||
controller -> {
|
||||
try {
|
||||
controller.dispose();
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to dispose controller {}", controller.getName(), t);
|
||||
}
|
||||
});
|
||||
log.info("Shutdown {} controllers.", controllers.size());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,177 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StopWatch;
|
||||
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
||||
|
||||
@Slf4j
|
||||
class DefaultController implements Controller {
|
||||
|
||||
private final String name;
|
||||
|
||||
private final Reconciler reconciler;
|
||||
|
||||
private final Supplier<Instant> nowSupplier;
|
||||
|
||||
private final RequestQueue queue;
|
||||
|
||||
private volatile boolean disposed = false;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
private final ExecutorService executor;
|
||||
|
||||
private final RequestSynchronizer synchronizer;
|
||||
|
||||
private final Duration minDelay;
|
||||
|
||||
private final Duration maxDelay;
|
||||
|
||||
public DefaultController(String name,
|
||||
Reconciler reconciler,
|
||||
RequestQueue queue,
|
||||
RequestSynchronizer synchronizer,
|
||||
Duration minDelay,
|
||||
Duration maxDelay) {
|
||||
this(name, reconciler, queue, synchronizer, Instant::now, minDelay, maxDelay);
|
||||
}
|
||||
|
||||
public DefaultController(String name,
|
||||
Reconciler reconciler,
|
||||
RequestQueue queue,
|
||||
RequestSynchronizer synchronizer,
|
||||
Supplier<Instant> nowSupplier,
|
||||
Duration minDelay,
|
||||
Duration maxDelay,
|
||||
ExecutorService executor) {
|
||||
this.name = name;
|
||||
this.reconciler = reconciler;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.queue = queue;
|
||||
this.synchronizer = synchronizer;
|
||||
this.minDelay = minDelay;
|
||||
this.maxDelay = maxDelay;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public DefaultController(String name,
|
||||
Reconciler reconciler,
|
||||
RequestQueue queue,
|
||||
RequestSynchronizer synchronizer,
|
||||
Supplier<Instant> nowSupplier,
|
||||
Duration minDelay,
|
||||
Duration maxDelay) {
|
||||
this(name, reconciler, queue, synchronizer, nowSupplier, minDelay, maxDelay,
|
||||
Executors.newSingleThreadExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (isStarted() || isDisposed()) {
|
||||
log.warn("Controller {} is already started or disposed.", getName());
|
||||
return;
|
||||
}
|
||||
this.started = true;
|
||||
log.info("Starting controller {}", name);
|
||||
// TODO Make more workers run the reconciler.
|
||||
executor.submit(this::run);
|
||||
}
|
||||
|
||||
protected void run() {
|
||||
log.info("Controller {} started", name);
|
||||
synchronizer.start();
|
||||
while (!this.isDisposed() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
var entry = queue.take();
|
||||
Reconciler.Result result;
|
||||
try {
|
||||
log.debug("Reconciling request {} at {}", entry.getEntry(), nowSupplier.get());
|
||||
StopWatch watch = new StopWatch("Reconcile: " + entry.getEntry().name());
|
||||
watch.start("reconciliation");
|
||||
result = this.reconciler.reconcile(entry.getEntry());
|
||||
watch.stop();
|
||||
log.debug("Reconciled request: {} with result: {}", entry.getEntry(), result);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(watch.toString());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.error("Reconciler aborted with an error, re-enqueuing...", t);
|
||||
result = new Reconciler.Result(true, null);
|
||||
} finally {
|
||||
queue.done(entry.getEntry());
|
||||
}
|
||||
if (!result.reEnqueue()) {
|
||||
continue;
|
||||
}
|
||||
var retryAfter = result.retryAfter();
|
||||
if (retryAfter == null) {
|
||||
retryAfter = entry.getRetryAfter();
|
||||
if (retryAfter == null
|
||||
|| retryAfter.isNegative()
|
||||
|| retryAfter.isZero()
|
||||
|| retryAfter.compareTo(minDelay) < 0) {
|
||||
// set min retry after
|
||||
retryAfter = minDelay;
|
||||
} else {
|
||||
try {
|
||||
// TODO Refactor the retryAfter with ratelimiter
|
||||
retryAfter = retryAfter.multipliedBy(2);
|
||||
} catch (ArithmeticException e) {
|
||||
retryAfter = maxDelay;
|
||||
}
|
||||
}
|
||||
if (retryAfter.compareTo(maxDelay) > 0) {
|
||||
retryAfter = maxDelay;
|
||||
}
|
||||
}
|
||||
queue.add(
|
||||
new DelayedEntry<>(entry.getEntry(), retryAfter, nowSupplier));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.info("Controller {} interrupted", name);
|
||||
}
|
||||
}
|
||||
log.info("Controller {} is stopped", name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
disposed = true;
|
||||
log.info("Disposing controller {}", name);
|
||||
|
||||
synchronizer.dispose();
|
||||
|
||||
executor.shutdownNow();
|
||||
try {
|
||||
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
log.warn("Wait timeout for controller {} shutdown", name);
|
||||
} else {
|
||||
log.info("Controller {} is disposed", name);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Interrupted while waiting for controller {} shutdown", name);
|
||||
} finally {
|
||||
queue.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return disposed;
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return started;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import run.halo.app.extension.controller.Reconciler.Request;
|
||||
|
||||
@Slf4j
|
||||
public class DefaultDelayQueue
|
||||
extends DelayQueue<DefaultDelayQueue.DelayedEntry<Request>> implements RequestQueue {
|
||||
|
||||
private final Supplier<Instant> nowSupplier;
|
||||
|
||||
private volatile boolean disposed = false;
|
||||
|
||||
private final Duration minDelay;
|
||||
|
||||
private final Set<Request> processing;
|
||||
|
||||
public DefaultDelayQueue(Supplier<Instant> nowSupplier) {
|
||||
this(nowSupplier, Duration.ZERO);
|
||||
}
|
||||
|
||||
public DefaultDelayQueue(Supplier<Instant> nowSupplier, Duration minDelay) {
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.minDelay = minDelay;
|
||||
this.processing = new HashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addImmediately(Request request) {
|
||||
var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier);
|
||||
return offer(delayedEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(DelayedEntry<Request> entry) {
|
||||
if (entry.getRetryAfter().compareTo(minDelay) < 0) {
|
||||
log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms",
|
||||
entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis());
|
||||
entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier);
|
||||
}
|
||||
return super.add(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelayedEntry<Request> take() throws InterruptedException {
|
||||
var entry = super.take();
|
||||
processing.add(entry.getEntry());
|
||||
return entry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done(Request request) {
|
||||
processing.remove(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(DelayedEntry<Request> entry) {
|
||||
if (this.isDisposed() || processing.contains(entry.getEntry())) {
|
||||
return false;
|
||||
}
|
||||
// remove the existing entry before adding the new one
|
||||
// to refresh the delay.
|
||||
this.remove(entry);
|
||||
return super.offer(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
this.disposed = true;
|
||||
this.clear();
|
||||
this.processing.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return this.disposed;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import run.halo.app.extension.Extension;
|
||||
import run.halo.app.extension.Watcher;
|
||||
import run.halo.app.extension.WatcherPredicates;
|
||||
|
||||
public class ExtensionWatcher implements Watcher {
|
||||
|
||||
private final RequestQueue queue;
|
||||
|
||||
private volatile boolean disposed = false;
|
||||
|
||||
private Runnable disposeHook;
|
||||
private final WatcherPredicates predicates;
|
||||
|
||||
public ExtensionWatcher(RequestQueue queue, WatcherPredicates predicates) {
|
||||
this.queue = queue;
|
||||
this.predicates = predicates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAdd(Extension extension) {
|
||||
if (isDisposed() || !predicates.onAddPredicate().test(extension)) {
|
||||
return;
|
||||
}
|
||||
// TODO filter the event
|
||||
queue.addImmediately(new Reconciler.Request(extension.getMetadata().getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(Extension oldExtension, Extension newExtension) {
|
||||
if (isDisposed() || !predicates.onUpdatePredicate().test(oldExtension, newExtension)) {
|
||||
return;
|
||||
}
|
||||
// TODO filter the event
|
||||
queue.addImmediately(new Reconciler.Request(newExtension.getMetadata().getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(Extension extension) {
|
||||
if (isDisposed() || !predicates.onDeletePredicate().test(extension)) {
|
||||
return;
|
||||
}
|
||||
// TODO filter the event
|
||||
queue.addImmediately(new Reconciler.Request(extension.getMetadata().getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerDisposeHook(Runnable dispose) {
|
||||
this.disposeHook = dispose;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
disposed = true;
|
||||
if (this.disposeHook != null) {
|
||||
this.disposeHook.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return this.disposed;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public interface Reconciler {
|
||||
|
||||
Result reconcile(Request request);
|
||||
|
||||
record Request(String name) {
|
||||
}
|
||||
|
||||
record Result(boolean reEnqueue, Duration retryAfter) {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static run.halo.app.extension.controller.Reconciler.Request;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import reactor.core.Disposable;
|
||||
|
||||
public interface RequestQueue extends Disposable {
|
||||
|
||||
boolean addImmediately(Request request);
|
||||
|
||||
boolean add(DelayedEntry<Request> entry);
|
||||
|
||||
DelayedEntry<Request> take() throws InterruptedException;
|
||||
|
||||
void done(Request request);
|
||||
|
||||
class DelayedEntry<E> implements Delayed {
|
||||
|
||||
private final E entry;
|
||||
|
||||
private final Instant readyAt;
|
||||
|
||||
private final Supplier<Instant> nowSupplier;
|
||||
|
||||
private final Duration retryAfter;
|
||||
|
||||
DelayedEntry(E entry, Duration retryAfter, Supplier<Instant> nowSupplier) {
|
||||
this.entry = entry;
|
||||
this.readyAt = nowSupplier.get().plusMillis(retryAfter.toMillis());
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.retryAfter = retryAfter;
|
||||
}
|
||||
|
||||
public DelayedEntry(E entry, Instant readyAt, Supplier<Instant> nowSupplier) {
|
||||
this.entry = entry;
|
||||
this.readyAt = readyAt;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.retryAfter = Duration.between(nowSupplier.get(), readyAt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
Duration diff = Duration.between(nowSupplier.get(), readyAt);
|
||||
return unit.convert(diff);
|
||||
}
|
||||
|
||||
public Duration getRetryAfter() {
|
||||
return retryAfter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
public E getEntry() {
|
||||
return entry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
DelayedEntry<?> that = (DelayedEntry<?>) o;
|
||||
return Objects.equals(entry, that.entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(entry);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.Disposable;
|
||||
import run.halo.app.extension.Extension;
|
||||
import run.halo.app.extension.ExtensionClient;
|
||||
import run.halo.app.extension.Watcher;
|
||||
|
||||
@Slf4j
|
||||
public class RequestSynchronizer implements Disposable {
|
||||
|
||||
private final ExtensionClient client;
|
||||
|
||||
private final Class<? extends Extension> type;
|
||||
|
||||
private final boolean syncAllOnStart;
|
||||
|
||||
private volatile boolean disposed = false;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
private final Watcher watcher;
|
||||
|
||||
private final Predicate<Extension> listPredicate;
|
||||
|
||||
public RequestSynchronizer(boolean syncAllOnStart,
|
||||
ExtensionClient client,
|
||||
Extension extension,
|
||||
Watcher watcher,
|
||||
Predicate<Extension> listPredicate) {
|
||||
this.syncAllOnStart = syncAllOnStart;
|
||||
this.client = client;
|
||||
this.type = extension.getClass();
|
||||
this.watcher = watcher;
|
||||
if (listPredicate == null) {
|
||||
listPredicate = e -> true;
|
||||
}
|
||||
this.listPredicate = listPredicate;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (isDisposed() || started) {
|
||||
return;
|
||||
}
|
||||
log.info("Starting request({}) synchronizer...", type);
|
||||
started = true;
|
||||
|
||||
if (syncAllOnStart) {
|
||||
client.list(type, listPredicate::test, null)
|
||||
.forEach(watcher::onAdd);
|
||||
}
|
||||
client.watch(this.watcher);
|
||||
log.info("Started request({}) synchronizer.", type);
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return started;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
disposed = true;
|
||||
watcher.dispose();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return this.disposed;
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ import jakarta.persistence.EntityNotFoundException;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
* An implementation of ExtensionStoreClient using JPA.
|
||||
|
@ -42,6 +43,7 @@ public class ExtensionStoreClientJPAImpl implements ExtensionStoreClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public ExtensionStore delete(String name, Long version) {
|
||||
var extensionStore =
|
||||
repository.findById(name).orElseThrow(EntityNotFoundException::new);
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.time.Instant;
|
|||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.security.crypto.password.PasswordEncoder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
@ -20,7 +20,7 @@ import run.halo.app.extension.Metadata;
|
|||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SuperAdminInitializer implements ApplicationListener<ApplicationStartedEvent> {
|
||||
public class SuperAdminInitializer implements ApplicationListener<ApplicationReadyEvent> {
|
||||
|
||||
private final ExtensionClient client;
|
||||
private final PasswordEncoder passwordEncoder;
|
||||
|
@ -31,7 +31,7 @@ public class SuperAdminInitializer implements ApplicationListener<ApplicationSta
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationStartedEvent event) {
|
||||
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||
client.fetch(User.class, "admin").ifPresentOrElse(user -> {
|
||||
// do nothing if admin has been initialized
|
||||
}, () -> {
|
||||
|
|
|
@ -53,7 +53,7 @@ springdoc:
|
|||
enabled: true
|
||||
show-login-endpoint: true
|
||||
show-actuator: true
|
||||
use-management-port: true
|
||||
use-management-port: false
|
||||
|
||||
management:
|
||||
endpoints:
|
||||
|
|
|
@ -33,4 +33,5 @@ public class PathPrefixPredicateTest {
|
|||
class TestController {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import static org.mockito.ArgumentMatchers.any;
|
|||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -21,6 +22,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
|
@ -98,11 +101,6 @@ class DefaultExtensionClientTest {
|
|||
return Unstructured.OBJECT_MAPPER.readValue(extensionJson, Unstructured.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFetchUnstructuredSuccessfully() {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowSchemeNotFoundExceptionWhenSchemeNotRegistered() {
|
||||
class UnRegisteredExtension extends AbstractExtension {
|
||||
|
@ -349,4 +347,41 @@ class DefaultExtensionClientTest {
|
|||
verify(storeClient, times(1)).delete(any(), any());
|
||||
}
|
||||
|
||||
@Nested
|
||||
@DisplayName("Extension watcher test")
|
||||
class WatcherTest {
|
||||
|
||||
@Mock
|
||||
Watcher watcher;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
client.watch(watcher);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldWatchOnAddSuccessfully() {
|
||||
doNothing().when(watcher).onAdd(any());
|
||||
shouldCreateSuccessfully();
|
||||
|
||||
verify(watcher, times(1)).onAdd(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldWatchOnUpdateSuccessfully() {
|
||||
doNothing().when(watcher).onUpdate(any(), any());
|
||||
shouldUpdateSuccessfully();
|
||||
|
||||
verify(watcher, times(1)).onUpdate(any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldWatchOnDeleteSuccessfully() {
|
||||
doNothing().when(watcher).onDelete(any());
|
||||
shouldDeleteSuccessfully();
|
||||
|
||||
verify(watcher, times(1)).onDelete(any());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -6,4 +6,13 @@ package run.halo.app.extension;
|
|||
plural = "fakes",
|
||||
singular = "fake")
|
||||
public class FakeExtension extends AbstractExtension {
|
||||
|
||||
public static FakeExtension createFake(String name) {
|
||||
var metadata = new Metadata();
|
||||
metadata.setName(name);
|
||||
var fake = new FakeExtension();
|
||||
fake.setMetadata(metadata);
|
||||
return fake;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import run.halo.app.extension.ExtensionClient;
|
||||
import run.halo.app.extension.FakeExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ControllerBuilderTest {
|
||||
|
||||
@Mock
|
||||
ExtensionClient client;
|
||||
|
||||
@Test
|
||||
void buildWithBlankName() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> new ControllerBuilder("", client).build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildWithNullClient() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> new ControllerBuilder("fake-name", null).build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildTest() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> new ControllerBuilder("fake-name", client)
|
||||
.build(),
|
||||
"Extension must not be null");
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> new ControllerBuilder("fake-name", client)
|
||||
.extension(new FakeExtension())
|
||||
.build(),
|
||||
"Reconciler must not be null");
|
||||
|
||||
assertNotNull(fakeBuilder().build());
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.syncAllOnStart(true)
|
||||
.nowSupplier(Instant::now)
|
||||
.minDelay(Duration.ofMillis(5))
|
||||
.maxDelay(Duration.ofSeconds(1000))
|
||||
.build());
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.syncAllOnStart(true)
|
||||
.minDelay(Duration.ofMillis(5))
|
||||
.maxDelay(Duration.ofSeconds(1000))
|
||||
.onAddPredicate(Objects::nonNull)
|
||||
.onUpdatePredicate(Objects::equals)
|
||||
.onDeletePredicate(Objects::nonNull)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalidMinDelayAndMaxDelay() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> fakeBuilder()
|
||||
.minDelay(Duration.ofSeconds(2))
|
||||
.maxDelay(Duration.ofSeconds(1))
|
||||
.build(),
|
||||
"Min delay must be less than or equal to max delay");
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.minDelay(null)
|
||||
.maxDelay(Duration.ofSeconds(1))
|
||||
.build());
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.minDelay(Duration.ofSeconds(1))
|
||||
.maxDelay(null)
|
||||
.build());
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.minDelay(Duration.ofSeconds(-1))
|
||||
.build());
|
||||
|
||||
assertNotNull(fakeBuilder()
|
||||
.maxDelay(Duration.ofSeconds(-1))
|
||||
.build());
|
||||
}
|
||||
|
||||
ControllerBuilder fakeBuilder() {
|
||||
return new ControllerBuilder("fake-name", client)
|
||||
.extension(new FakeExtension())
|
||||
.reconciler(request -> new Reconciler.Result(false, null));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,224 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import run.halo.app.extension.controller.Reconciler.Request;
|
||||
import run.halo.app.extension.controller.Reconciler.Result;
|
||||
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DefaultControllerTest {
|
||||
|
||||
@Mock
|
||||
RequestQueue queue;
|
||||
|
||||
@Mock
|
||||
Reconciler reconciler;
|
||||
|
||||
@Mock
|
||||
RequestSynchronizer synchronizer;
|
||||
|
||||
@Mock
|
||||
ExecutorService executor;
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
Duration minRetryAfter = Duration.ofMillis(100);
|
||||
|
||||
Duration maxRetryAfter = Duration.ofSeconds(10);
|
||||
|
||||
DefaultController controller;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
controller = new DefaultController("fake-controller", reconciler, queue, synchronizer,
|
||||
() -> now, minRetryAfter, maxRetryAfter, executor);
|
||||
|
||||
assertFalse(controller.isDisposed());
|
||||
assertFalse(controller.isStarted());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnRightName() {
|
||||
assertEquals("fake-controller", controller.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldRunCorrectlyIfReconcilerReturnsNoReEnqueue() throws InterruptedException {
|
||||
when(queue.take()).thenReturn(new DelayedEntry<>(
|
||||
new Request("fake-request"), Duration.ofSeconds(1), () -> now
|
||||
))
|
||||
.thenThrow(InterruptedException.class);
|
||||
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(false, null));
|
||||
|
||||
controller.run();
|
||||
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(2)).take();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(1)).done(any());
|
||||
verify(reconciler, times(1)).reconcile(eq(new Request("fake-request")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldRunCorrectlyIfReconcilerReturnsReEnqueue() throws InterruptedException {
|
||||
when(queue.take()).thenReturn(new DelayedEntry<>(
|
||||
new Request("fake-request"), Duration.ofSeconds(1), () -> now
|
||||
))
|
||||
.thenThrow(InterruptedException.class);
|
||||
when(queue.add(any())).thenReturn(true);
|
||||
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
|
||||
|
||||
controller.run();
|
||||
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(2)).take();
|
||||
verify(queue, times(1)).done(any());
|
||||
verify(queue, times(1)).add(argThat(de ->
|
||||
de.getEntry().name().equals("fake-request")
|
||||
&& de.getRetryAfter().equals(Duration.ofSeconds(2))));
|
||||
verify(reconciler, times(1)).reconcile(any(Request.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReRunIfReconcilerThrowException() throws InterruptedException {
|
||||
when(queue.take()).thenReturn(new DelayedEntry<>(
|
||||
new Request("fake-request"), Duration.ofSeconds(1), () -> now
|
||||
))
|
||||
.thenThrow(InterruptedException.class);
|
||||
when(queue.add(any())).thenReturn(true);
|
||||
when(reconciler.reconcile(any(Request.class))).thenThrow(RuntimeException.class);
|
||||
|
||||
controller.run();
|
||||
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(2)).take();
|
||||
verify(queue, times(1)).done(any());
|
||||
verify(queue, times(1)).add(argThat(de ->
|
||||
de.getEntry().name().equals("fake-request")
|
||||
&& de.getRetryAfter().equals(Duration.ofSeconds(2))));
|
||||
verify(reconciler, times(1)).reconcile(any(Request.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException {
|
||||
when(queue.take()).thenReturn(new DelayedEntry<>(
|
||||
new Request("fake-request"), minRetryAfter.minusMillis(1), () -> now
|
||||
))
|
||||
.thenThrow(InterruptedException.class);
|
||||
when(queue.add(any())).thenReturn(true);
|
||||
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
|
||||
|
||||
controller.run();
|
||||
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(2)).take();
|
||||
verify(queue, times(1)).done(any());
|
||||
verify(queue, times(1)).add(argThat(de ->
|
||||
de.getEntry().name().equals("fake-request")
|
||||
&& de.getRetryAfter().equals(minRetryAfter)));
|
||||
verify(reconciler, times(1)).reconcile(any(Request.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldSetMaxRetryAfterWhenTakeGreaterThanMaxRetryAfterDelayedEntry()
|
||||
throws InterruptedException {
|
||||
when(queue.take()).thenReturn(new DelayedEntry<>(
|
||||
new Request("fake-request"), maxRetryAfter.plusMillis(1), () -> now
|
||||
))
|
||||
.thenThrow(InterruptedException.class);
|
||||
when(queue.add(any())).thenReturn(true);
|
||||
when(reconciler.reconcile(any(Request.class))).thenReturn(new Result(true, null));
|
||||
|
||||
controller.run();
|
||||
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(2)).take();
|
||||
verify(queue, times(1)).done(any());
|
||||
verify(queue, times(1)).add(argThat(de ->
|
||||
de.getEntry().name().equals("fake-request")
|
||||
&& de.getRetryAfter().equals(maxRetryAfter)));
|
||||
verify(reconciler, times(1)).reconcile(any(Request.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDisposeCorrectly() throws InterruptedException {
|
||||
when(executor.awaitTermination(anyLong(), any())).thenReturn(true);
|
||||
|
||||
controller.dispose();
|
||||
|
||||
assertTrue(controller.isDisposed());
|
||||
assertFalse(controller.isStarted());
|
||||
|
||||
verify(synchronizer, times(1)).dispose();
|
||||
verify(queue, times(1)).dispose();
|
||||
verify(executor, times(1)).shutdownNow();
|
||||
verify(executor, times(1)).awaitTermination(anyLong(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDisposeCorrectlyEvenIfTimeoutAwaitTermination() throws InterruptedException {
|
||||
when(executor.awaitTermination(anyLong(), any())).thenThrow(InterruptedException.class);
|
||||
|
||||
controller.dispose();
|
||||
|
||||
assertTrue(controller.isDisposed());
|
||||
assertFalse(controller.isStarted());
|
||||
|
||||
verify(synchronizer, times(1)).dispose();
|
||||
verify(queue, times(1)).dispose();
|
||||
verify(executor, times(1)).shutdownNow();
|
||||
verify(executor, times(1)).awaitTermination(anyLong(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldStartCorrectly() throws InterruptedException {
|
||||
when(executor.submit(any(Runnable.class))).thenAnswer(invocation -> {
|
||||
doNothing().when(synchronizer).start();
|
||||
when(queue.take()).thenThrow(InterruptedException.class);
|
||||
|
||||
// invoke the task really
|
||||
((Runnable) invocation.getArgument(0)).run();
|
||||
return mock(Future.class);
|
||||
});
|
||||
controller.start();
|
||||
|
||||
assertTrue(controller.isStarted());
|
||||
assertFalse(controller.isDisposed());
|
||||
|
||||
verify(executor, times(1)).submit(any(Runnable.class));
|
||||
verify(synchronizer, times(1)).start();
|
||||
verify(queue, times(1)).take();
|
||||
verify(reconciler, times(0)).reconcile(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotStartWhenDisposed() {
|
||||
controller.dispose();
|
||||
controller.start();
|
||||
assertFalse(controller.isStarted());
|
||||
assertTrue(controller.isDisposed());
|
||||
|
||||
verify(executor, times(0)).submit(any(Runnable.class));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import run.halo.app.extension.controller.Reconciler.Request;
|
||||
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
||||
|
||||
class DefaultDelayQueueTest {
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
DefaultDelayQueue queue;
|
||||
|
||||
final Duration minDelay = Duration.ofMillis(1);
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
queue = new DefaultDelayQueue(() -> now, minDelay);
|
||||
}
|
||||
|
||||
@Test
|
||||
void addImmediatelyTest() {
|
||||
var request = newRequest("fake-name");
|
||||
var added = queue.addImmediately(request);
|
||||
assertTrue(added);
|
||||
assertEquals(1, queue.size());
|
||||
var delayedEntry = queue.peek();
|
||||
assertNotNull(delayedEntry);
|
||||
assertEquals(newRequest("fake-name"), delayedEntry.getEntry());
|
||||
assertEquals(minDelay, delayedEntry.getRetryAfter());
|
||||
assertEquals(minDelay.toMillis(), delayedEntry.getDelay(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void addWithDelaySmallerThanMinDelay() {
|
||||
var request = newRequest("fake-name");
|
||||
var added = queue.add(new DelayedEntry<>(request, Duration.ofNanos(1), () -> now));
|
||||
assertTrue(added);
|
||||
assertEquals(1, queue.size());
|
||||
var delayedEntry = queue.peek();
|
||||
assertNotNull(delayedEntry);
|
||||
assertEquals(newRequest("fake-name"), delayedEntry.getEntry());
|
||||
assertEquals(minDelay, delayedEntry.getRetryAfter());
|
||||
assertEquals(minDelay.toMillis(), delayedEntry.getDelay(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void addWithDelayGreaterThanMinDelay() {
|
||||
var request = newRequest("fake-name");
|
||||
var added = queue.add(new DelayedEntry<>(request, minDelay.plusMillis(1), () -> now));
|
||||
assertTrue(added);
|
||||
assertEquals(1, queue.size());
|
||||
var delayedEntry = queue.peek();
|
||||
assertNotNull(delayedEntry);
|
||||
assertEquals(newRequest("fake-name"), delayedEntry.getEntry());
|
||||
assertEquals(minDelay.plusMillis(1), delayedEntry.getRetryAfter());
|
||||
assertEquals(minDelay.plusMillis(1).toMillis(),
|
||||
delayedEntry.getDelay(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotAddAfterDisposing() {
|
||||
assertFalse(queue.isDisposed());
|
||||
queue.dispose();
|
||||
assertTrue(queue.isDisposed());
|
||||
var request = newRequest("fake-name");
|
||||
var added = queue.add(new DelayedEntry<>(request, minDelay, () -> now));
|
||||
assertFalse(added);
|
||||
assertEquals(0, queue.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotAddRepeatedlyIfNotDone() throws InterruptedException {
|
||||
var entrySpy = spy(new DelayedEntry<>(newRequest("fake-name"), minDelay, () -> now));
|
||||
|
||||
doReturn(0L).when(entrySpy).getDelay(any());
|
||||
|
||||
queue.add(entrySpy);
|
||||
assertEquals(1, queue.size());
|
||||
assertEquals(entrySpy, queue.peek());
|
||||
queue.take();
|
||||
assertEquals(0, queue.size());
|
||||
|
||||
queue.add(entrySpy);
|
||||
assertEquals(0, queue.size());
|
||||
|
||||
queue.done(newRequest("fake-name"));
|
||||
queue.add(entrySpy);
|
||||
assertEquals(1, queue.size());
|
||||
assertEquals(entrySpy, queue.peek());
|
||||
}
|
||||
|
||||
Request newRequest(String name) {
|
||||
return new Request(name);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
||||
|
||||
class DelayedEntryTest {
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
@Test
|
||||
void createDelayedEntry() {
|
||||
var delayedEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
|
||||
assertEquals(100, delayedEntry.getDelay(TimeUnit.MILLISECONDS));
|
||||
assertEquals(Duration.ofMillis(100), delayedEntry.getRetryAfter());
|
||||
assertEquals("fake", delayedEntry.getEntry());
|
||||
|
||||
delayedEntry = new DelayedEntry<>("fake", now.plus(Duration.ofSeconds(1)), () -> now);
|
||||
assertEquals(1000, delayedEntry.getDelay(TimeUnit.MILLISECONDS));
|
||||
assertEquals(Duration.ofMillis(1000), delayedEntry.getRetryAfter());
|
||||
}
|
||||
|
||||
@Test
|
||||
void compareWithGreaterDelay() {
|
||||
var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
|
||||
var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now);
|
||||
|
||||
assertTrue(firstDelayEntry.compareTo(secondDelayEntry) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void compareWithSameDelay() {
|
||||
var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
|
||||
var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
|
||||
|
||||
assertEquals(0, firstDelayEntry.compareTo(secondDelayEntry));
|
||||
}
|
||||
|
||||
@Test
|
||||
void compareWithLessDelay() {
|
||||
var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now);
|
||||
var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
|
||||
|
||||
assertTrue(firstDelayEntry.compareTo(secondDelayEntry) > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldBeEqualWithNameOnly() {
|
||||
var firstDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(200), () -> now);
|
||||
var secondDelayEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), Instant::now);
|
||||
|
||||
assertEquals(firstDelayEntry, secondDelayEntry);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static run.halo.app.extension.FakeExtension.createFake;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import run.halo.app.extension.WatcherPredicates;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ExtensionWatcherTest {
|
||||
|
||||
@Mock
|
||||
RequestQueue queue;
|
||||
|
||||
@Mock
|
||||
WatcherPredicates predicates;
|
||||
|
||||
@InjectMocks
|
||||
ExtensionWatcher watcher;
|
||||
|
||||
@Test
|
||||
void shouldAddExtensionWhenAddPredicateAlwaysTrue() {
|
||||
when(predicates.onAddPredicate()).thenReturn(e -> true);
|
||||
watcher.onAdd(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onAddPredicate();
|
||||
verify(queue, times(1)).addImmediately(
|
||||
argThat(request -> request.name().equals("fake-name")));
|
||||
verify(queue, times(0)).add(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotAddExtensionWhenAddPredicateAlwaysFalse() {
|
||||
when(predicates.onAddPredicate()).thenReturn(e -> false);
|
||||
watcher.onAdd(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onAddPredicate();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotAddExtensionWhenWatcherIsDisposed() {
|
||||
watcher.dispose();
|
||||
watcher.onAdd(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(0)).onAddPredicate();
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
verify(queue, times(0)).add(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldUpdateExtensionWhenUpdatePredicateAlwaysTrue() {
|
||||
when(predicates.onUpdatePredicate()).thenReturn((e1, e2) -> true);
|
||||
watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onUpdatePredicate();
|
||||
verify(queue, times(1)).addImmediately(
|
||||
argThat(request -> request.name().equals("new-fake-name")));
|
||||
verify(queue, times(0)).add(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldUpdateExtensionWhenUpdatePredicateAlwaysFalse() {
|
||||
when(predicates.onUpdatePredicate()).thenReturn((e1, e2) -> false);
|
||||
watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onUpdatePredicate();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotUpdateExtensionWhenWatcherIsDisposed() {
|
||||
watcher.dispose();
|
||||
watcher.onUpdate(createFake("old-fake-name"), createFake("new-fake-name"));
|
||||
|
||||
verify(predicates, times(0)).onUpdatePredicate();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDeleteExtensionWhenDeletePredicateAlwaysTrue() {
|
||||
when(predicates.onDeletePredicate()).thenReturn(e -> true);
|
||||
watcher.onDelete(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onDeletePredicate();
|
||||
verify(queue, times(1)).addImmediately(
|
||||
argThat(request -> request.name().equals("fake-name")));
|
||||
verify(queue, times(0)).add(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDeleteExtensionWhenDeletePredicateAlwaysFalse() {
|
||||
when(predicates.onDeletePredicate()).thenReturn(e -> false);
|
||||
watcher.onDelete(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(1)).onDeletePredicate();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotDeleteExtensionWhenWatcherIsDisposed() {
|
||||
watcher.dispose();
|
||||
watcher.onDelete(createFake("fake-name"));
|
||||
|
||||
verify(predicates, times(0)).onDeletePredicate();
|
||||
verify(queue, times(0)).add(any());
|
||||
verify(queue, times(0)).addImmediately(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldInvokeDisposeHookIfRegistered() {
|
||||
var mockHook = mock(Runnable.class);
|
||||
watcher.registerDisposeHook(mockHook);
|
||||
verify(mockHook, times(0)).run();
|
||||
|
||||
watcher.dispose();
|
||||
verify(mockHook, times(1)).run();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
package run.halo.app.extension.controller;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static run.halo.app.extension.FakeExtension.createFake;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import run.halo.app.extension.Extension;
|
||||
import run.halo.app.extension.ExtensionClient;
|
||||
import run.halo.app.extension.FakeExtension;
|
||||
import run.halo.app.extension.Watcher;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class RequestSynchronizerTest {
|
||||
|
||||
@Mock
|
||||
ExtensionClient client;
|
||||
|
||||
@Mock
|
||||
Watcher watcher;
|
||||
|
||||
@Mock
|
||||
Predicate<Extension> listPredicate;
|
||||
|
||||
RequestSynchronizer synchronizer;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
synchronizer =
|
||||
new RequestSynchronizer(true, client, new FakeExtension(), watcher, listPredicate);
|
||||
assertFalse(synchronizer.isDisposed());
|
||||
assertFalse(synchronizer.isStarted());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldStartCorrectlyWhenSyncingAllOnStart() {
|
||||
when(client.list(same(FakeExtension.class), any(), any())).thenReturn(
|
||||
List.of(createFake("fake-01"), createFake("fake-02")));
|
||||
|
||||
synchronizer.start();
|
||||
|
||||
assertTrue(synchronizer.isStarted());
|
||||
assertFalse(synchronizer.isDisposed());
|
||||
|
||||
verify(client, times(1)).list(same(FakeExtension.class), any(), any());
|
||||
verify(watcher, times(2)).onAdd(any());
|
||||
verify(client, times(1)).watch(same(watcher));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldStartCorrectlyWhenNotSyncingAllOnStart() {
|
||||
synchronizer =
|
||||
new RequestSynchronizer(false, client, new FakeExtension(), watcher, listPredicate);
|
||||
assertFalse(synchronizer.isDisposed());
|
||||
assertFalse(synchronizer.isStarted());
|
||||
|
||||
synchronizer.start();
|
||||
|
||||
assertTrue(synchronizer.isStarted());
|
||||
assertFalse(synchronizer.isDisposed());
|
||||
|
||||
verify(client, times(0)).list(any(), any(), any());
|
||||
verify(watcher, times(0)).onAdd(any());
|
||||
verify(client, times(1)).watch(any(Watcher.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDisposeCorrectly() {
|
||||
synchronizer.start();
|
||||
assertFalse(synchronizer.isDisposed());
|
||||
assertTrue(synchronizer.isStarted());
|
||||
|
||||
synchronizer.dispose();
|
||||
|
||||
assertTrue(synchronizer.isDisposed());
|
||||
assertTrue(synchronizer.isStarted());
|
||||
verify(watcher, times(1)).dispose();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotStartAfterDisposing() {
|
||||
synchronizer.dispose();
|
||||
synchronizer.start();
|
||||
|
||||
verify(client, times(0)).list(any(), any(), any());
|
||||
verify(watcher, times(0)).onAdd(any());
|
||||
verify(client, times(0)).watch(any());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package run.halo.app.security;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
|
@ -31,7 +30,6 @@ class SuperAdminInitializerTest {
|
|||
|
||||
@Test
|
||||
void checkSuperAdminInitialization() {
|
||||
verify(client, times(1)).fetch(eq(User.class), eq("admin"));
|
||||
verify(client, times(1)).create(argThat(extension -> {
|
||||
if (extension instanceof User user) {
|
||||
return "admin".equals(user.getMetadata().getName());
|
||||
|
|
Loading…
Reference in New Issue