Fix the problem that publishing post always fails (#3210)

#### What type of PR is this?

/kind improvement
/area core
/milestone 2.3.x

#### What this PR does / why we need it:

1. Refactor Queue in Controller. I add dirty set of request to avoid lost of new request from other places instead of reconciler.
2. Enhance equals and hashcode methods to ensure we can ignore updates of same extensions.

#### Which issue(s) this PR fixes:

Fixes https://github.com/halo-dev/halo/issues/2860

#### Special notes for your reviewer:

Please take some time to test publishing posts.

#### Does this PR introduce a user-facing change?

```release-note
修复发布文章时经常出现错误的问题
```
pull/3219/head
John Niang 2023-02-02 20:16:10 +08:00 committed by GitHub
parent 0fd023b8f6
commit 4636990d5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 258 additions and 129 deletions

View File

@ -10,8 +10,6 @@ import java.util.List;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.pf4j.PluginState;
import org.springframework.lang.NonNull;
@ -93,8 +91,7 @@ public class Plugin extends AbstractExtension {
private String configMapName;
}
@Getter
@Setter
@Data
public static class License {
private String name;
private String url;

View File

@ -53,6 +53,7 @@ public class Role extends AbstractExtension {
* @since 2.0.0
*/
@Getter
@EqualsAndHashCode
public static class PolicyRule implements Comparable<PolicyRule> {
/**
* APIGroups is the name of the APIGroup that contains the resources.

View File

@ -12,7 +12,7 @@ import lombok.EqualsAndHashCode;
* @author johnniang
*/
@Data
@EqualsAndHashCode
@EqualsAndHashCode(exclude = "version")
public class Metadata implements MetadataOperator {
/**

View File

@ -7,6 +7,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.util.Predicates;
@ -136,20 +137,25 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
mono = get(extension.getClass(), extension.getMetadata().getName());
}
return mono
.map(old -> {
.flatMap(old -> {
// reset some fields
var oldMetadata = old.getMetadata();
var newMetadata = extension.getMetadata();
newMetadata.setCreationTimestamp(oldMetadata.getCreationTimestamp());
newMetadata.setDeletionTimestamp(oldMetadata.getDeletionTimestamp());
extension.setMetadata(newMetadata);
return converter.convertTo(extension);
if (Objects.equals(old, extension)) {
return Mono.empty();
}
return Mono.just(extension);
})
.map(converter::convertTo)
.flatMap(extensionStore -> client.update(extensionStore.getName(),
extensionStore.getVersion(),
extensionStore.getData()))
.map(updated -> converter.convertFrom((Class<E>) extension.getClass(), updated))
.doOnNext(updated -> watchers.onUpdate(extension, updated));
.doOnNext(updated -> watchers.onUpdate(extension, updated))
.switchIfEmpty(Mono.defer(() -> Mono.just(extension)));
}
@Override

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import lombok.EqualsAndHashCode;
/**
* Unstructured is a generic Extension, which wraps ObjectNode to maintain the Extension data, like
@ -65,6 +66,7 @@ public class Unstructured implements Extension {
return new UnstructuredMetadata();
}
@EqualsAndHashCode(exclude = "version")
class UnstructuredMetadata implements MetadataOperator {
@Override

View File

@ -106,7 +106,7 @@ public class ControllerBuilder {
Assert.notNull(extension, "Extension must not be null");
Assert.notNull(reconciler, "Reconciler must not be null");
var queue = new DefaultDelayQueue<Request>(nowSupplier, minDelay);
var queue = new DefaultQueue<Request>(nowSupplier, minDelay);
var predicates = new WatcherPredicates.Builder()
.withGroupVersionKind(extension.groupVersionKind())
.onAddPredicate(onAddPredicate)

View File

@ -1,85 +0,0 @@
package run.halo.app.extension.controller;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DefaultDelayQueue<R>
extends DelayQueue<DefaultDelayQueue.DelayedEntry<R>> implements RequestQueue<R> {
private final Supplier<Instant> nowSupplier;
private volatile boolean disposed = false;
private final Duration minDelay;
private final Set<R> processing;
public DefaultDelayQueue(Supplier<Instant> nowSupplier) {
this(nowSupplier, Duration.ZERO);
}
public DefaultDelayQueue(Supplier<Instant> nowSupplier, Duration minDelay) {
this.nowSupplier = nowSupplier;
this.minDelay = minDelay;
this.processing = new HashSet<>();
}
@Override
public boolean addImmediately(R request) {
log.debug("Adding request {} immediately", request);
var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier);
return offer(delayedEntry);
}
@Override
public boolean add(DelayedEntry<R> entry) {
if (entry.getRetryAfter().compareTo(minDelay) < 0) {
log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms",
entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis());
entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier);
}
return super.add(entry);
}
@Override
public DelayedEntry<R> take() throws InterruptedException {
var entry = super.take();
processing.add(entry.getEntry());
return entry;
}
@Override
public void done(R request) {
processing.remove(request);
}
@Override
public boolean offer(DelayedEntry<R> entry) {
if (this.isDisposed() || processing.contains(entry.getEntry())) {
return false;
}
// remove the existing entry before adding the new one
// to refresh the delay.
this.remove(entry);
return super.offer(entry);
}
@Override
public void dispose() {
this.disposed = true;
this.clear();
this.processing.clear();
}
@Override
public boolean isDisposed() {
return this.disposed;
}
}

View File

@ -0,0 +1,141 @@
package run.halo.app.extension.controller;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DefaultQueue<R> implements RequestQueue<R> {
private final Lock lock;
private final DelayQueue<DelayedEntry<R>> queue;
private final Supplier<Instant> nowSupplier;
private volatile boolean disposed = false;
private final Duration minDelay;
private final Set<R> processing;
private final Set<R> dirty;
public DefaultQueue(Supplier<Instant> nowSupplier) {
this(nowSupplier, Duration.ZERO);
}
public DefaultQueue(Supplier<Instant> nowSupplier, Duration minDelay) {
this.lock = new ReentrantLock();
this.nowSupplier = nowSupplier;
this.minDelay = minDelay;
this.processing = new HashSet<>();
this.dirty = new HashSet<>();
this.queue = new DelayQueue<>();
}
@Override
public boolean addImmediately(R request) {
log.debug("Adding request {} immediately", request);
var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier);
return add(delayedEntry);
}
@Override
public boolean add(DelayedEntry<R> entry) {
lock.lock();
try {
if (isDisposed()) {
return false;
}
log.debug("Adding request {} after {}", entry.getEntry(), entry.getRetryAfter());
if (entry.getRetryAfter().compareTo(minDelay) < 0) {
log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms",
entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis());
entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier);
}
if (dirty.contains(entry.getEntry())) {
return false;
}
dirty.add(entry.getEntry());
if (processing.contains(entry.getEntry())) {
return false;
}
boolean added = queue.add(entry);
log.debug("Added request {} after {}", entry.getEntry(), entry.getRetryAfter());
return added;
} finally {
lock.unlock();
}
}
@Override
public DelayedEntry<R> take() throws InterruptedException {
var entry = queue.take();
log.debug("Take request {} at {}", entry.getEntry(), Instant.now());
lock.lockInterruptibly();
try {
if (isDisposed()) {
throw new InterruptedException(
"Queue has been disposed. Cannot take any elements now");
}
processing.add(entry.getEntry());
dirty.remove(entry.getEntry());
return entry;
} finally {
lock.unlock();
}
}
@Override
public void done(R request) {
lock.lock();
try {
if (isDisposed()) {
return;
}
processing.remove(request);
if (dirty.contains(request)) {
queue.add(new DelayedEntry<>(request, minDelay, nowSupplier));
}
} finally {
lock.unlock();
}
}
@Override
public long size() {
return queue.size();
}
@Override
public DelayedEntry<R> peek() {
return queue.peek();
}
@Override
public void dispose() {
lock.lock();
try {
disposed = true;
queue.clear();
processing.clear();
dirty.clear();
} finally {
lock.unlock();
}
}
@Override
public boolean isDisposed() {
return this.disposed;
}
}

View File

@ -18,6 +18,10 @@ public interface RequestQueue<E> extends Disposable {
void done(E request);
long size();
DelayedEntry<E> peek();
class DelayedEntry<E> implements Delayed {
private final E entry;

View File

@ -13,7 +13,7 @@ import run.halo.app.extension.SchemeManager;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.store.ExtensionStoreClient;
@ -55,7 +55,7 @@ class GcReconciler implements Reconciler<GcRequest> {
@Override
public Controller setupWith(ControllerBuilder builder) {
var queue = new DefaultDelayQueue<GcRequest>(Instant::now, Duration.ofMillis(500));
var queue = new DefaultQueue<GcRequest>(Instant::now, Duration.ofMillis(500));
var synchronizer = new GcSynchronizer(client, queue, schemeManager);
return new DefaultController<>(
"garbage-collector-controller",

View File

@ -1,9 +1,9 @@
package run.halo.app.infra;
import java.util.AbstractCollection;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.function.Consumer;
import org.springframework.lang.NonNull;
@ -20,7 +20,7 @@ import org.springframework.lang.NonNull;
*/
public class ConditionList extends AbstractCollection<Condition> {
private static final int EVICT_THRESHOLD = 20;
private final Deque<Condition> conditions = new ArrayDeque<>();
private final Deque<Condition> conditions = new LinkedList<>();
@Override
public boolean add(@NonNull Condition condition) {
@ -113,4 +113,21 @@ public class ConditionList extends AbstractCollection<Condition> {
public void forEach(Consumer<? super Condition> action) {
conditions.forEach(action);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConditionList that = (ConditionList) o;
return Objects.equals(conditions, that.conditions);
}
@Override
public int hashCode() {
return Objects.hash(conditions);
}
}

View File

@ -16,7 +16,7 @@ import run.halo.app.extension.ExtensionClient;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.controller.RequestQueue;
@ -37,7 +37,7 @@ public class ReplyEventReconciler implements Reconciler<ReplyEvent>, SmartLifecy
public ReplyEventReconciler(ExtensionClient client) {
this.client = client;
replyEventQueue = new DefaultDelayQueue<>(Instant::now);
replyEventQueue = new DefaultQueue<>(Instant::now);
replyEventController = this.setupWith(null);
}

View File

@ -17,7 +17,7 @@ import run.halo.app.extension.ExtensionClient;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.controller.RequestQueue;
@ -41,7 +41,7 @@ public class VisitedEventReconciler
public VisitedEventReconciler(ExtensionClient client) {
this.client = client;
visitedEventQueue = new DefaultDelayQueue<>(Instant::now);
visitedEventQueue = new DefaultQueue<>(Instant::now);
visitedEventController = this.setupWith(null);
}

View File

@ -15,7 +15,7 @@ import run.halo.app.extension.ExtensionClient;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.controller.RequestQueue;
@ -36,7 +36,7 @@ public class VotedEventReconciler implements Reconciler<VotedEvent>, SmartLifecy
public VotedEventReconciler(ExtensionClient client) {
this.client = client;
votedEventQueue = new DefaultDelayQueue<>(Instant::now);
votedEventQueue = new DefaultQueue<>(Instant::now);
votedEventController = this.setupWith(null);
}

View File

@ -15,7 +15,7 @@ import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.controller.RequestQueue;
import run.halo.app.plugin.event.PluginCreatedEvent;
@ -42,7 +42,7 @@ public class PluginCreatedEventReconciler
public PluginCreatedEventReconciler(ReactiveExtensionClient client) {
this.client = client;
pluginEventQueue = new DefaultDelayQueue<>(Instant::now);
pluginEventQueue = new DefaultQueue<>(Instant::now);
pluginEventController = this.setupWith(null);
}

View File

@ -17,7 +17,7 @@ import run.halo.app.event.post.PostUnpublishedEvent;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
import run.halo.app.extension.controller.DefaultController;
import run.halo.app.extension.controller.DefaultDelayQueue;
import run.halo.app.extension.controller.DefaultQueue;
import run.halo.app.extension.controller.Reconciler;
import run.halo.app.extension.controller.RequestQueue;
import run.halo.app.plugin.extensionpoint.ExtensionGetter;
@ -43,7 +43,7 @@ public class PostEventReconciler implements Reconciler<PostEvent>, SmartLifecycl
this.extensionGetter = extensionGetter;
this.postFinder = postFinder;
postEventQueue = new DefaultDelayQueue<>(Instant::now);
postEventQueue = new DefaultQueue<>(Instant::now);
postEventController = this.setupWith(null);
}

View File

@ -21,6 +21,7 @@ import static run.halo.app.extension.GroupVersionKind.fromAPIVersionAndKind;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
@ -395,6 +396,7 @@ class ReactiveExtensionClientTest {
@Test
void shouldUpdateSuccessfully() {
var fake = createFakeExtension("fake", 2L);
fake.getMetadata().setLabels(Map.of("new", "true"));
var storeName = "/registry/fake.halo.run/fakes/fake";
when(converter.convertTo(any())).thenReturn(
createExtensionStore(storeName, 2L));
@ -402,15 +404,45 @@ class ReactiveExtensionClientTest {
Mono.just(createExtensionStore(storeName, 2L)));
when(storeClient.fetchByName(storeName)).thenReturn(
Mono.just(createExtensionStore(storeName, 1L)));
when(converter.convertFrom(same(FakeExtension.class), any())).thenReturn(fake);
var oldFake = createFakeExtension("fake", 2L);
oldFake.getMetadata().setLabels(Map.of("old", "true"));
var updatedFake = createFakeExtension("fake", 3L);
updatedFake.getMetadata().setLabels(Map.of("updated", "true"));
when(converter.convertFrom(same(FakeExtension.class), any()))
.thenReturn(oldFake)
.thenReturn(updatedFake);
StepVerifier.create(client.update(fake))
.expectNext(updatedFake)
.verifyComplete();
verify(storeClient).fetchByName(storeName);
verify(converter).convertTo(eq(fake));
verify(converter, times(2)).convertFrom(same(FakeExtension.class), any());
verify(storeClient)
.update(eq("/registry/fake.halo.run/fakes/fake"), eq(2L), any());
}
@Test
void shouldNotUpdateIfExtensionNotChange() {
var fake = createFakeExtension("fake", 2L);
var storeName = "/registry/fake.halo.run/fakes/fake";
when(storeClient.fetchByName(storeName)).thenReturn(
Mono.just(createExtensionStore(storeName, 1L)));
var oldFake = createFakeExtension("fake", 2L);
when(converter.convertFrom(same(FakeExtension.class), any())).thenReturn(oldFake);
StepVerifier.create(client.update(fake))
.expectNext(fake)
.verifyComplete();
verify(converter, times(1)).convertTo(eq(fake));
verify(storeClient, times(1))
.update(eq("/registry/fake.halo.run/fakes/fake"), eq(2L), any());
verify(storeClient).fetchByName(storeName);
verify(converter).convertFrom(same(FakeExtension.class), any());
verify(converter, never()).convertTo(any());
verify(storeClient, never()).update(any(), any(), any());
}
@Test
@ -423,14 +455,24 @@ class ReactiveExtensionClientTest {
.thenReturn(Mono.just(createExtensionStore(name, 12345L)));
when(storeClient.fetchByName(name))
.thenReturn(Mono.just(createExtensionStore(name, 12346L)));
when(converter.convertFrom(same(Unstructured.class), any())).thenReturn(fake);
var oldFake = createUnstructured();
oldFake.getMetadata().setLabels(Map.of("old", "true"));
var updatedFake = createUnstructured();
updatedFake.getMetadata().setLabels(Map.of("updated", "true"));
when(converter.convertFrom(same(Unstructured.class), any()))
.thenReturn(oldFake)
.thenReturn(updatedFake);
StepVerifier.create(client.update(fake))
.expectNext(fake)
.expectNext(updatedFake)
.verifyComplete();
verify(converter, times(1)).convertTo(eq(fake));
verify(storeClient, times(1))
verify(storeClient).fetchByName(name);
verify(converter).convertTo(eq(fake));
verify(converter, times(2)).convertFrom(same(Unstructured.class), any());
verify(storeClient)
.update(eq("/registry/fake.halo.run/fakes/fake"), eq(12345L), any());
}
@ -480,6 +522,13 @@ class ReactiveExtensionClientTest {
verify(watcher, times(1)).onUpdate(any(), any());
}
@Test
void shouldNotWatchOnUpdateIfExtensionNotChange() {
shouldNotUpdateIfExtensionNotChange();
verify(watcher, never()).onUpdate(any(), any());
}
@Test
void shouldWatchOnDeleteSuccessfully() {
doNothing().when(watcher).onDelete(any());

View File

@ -4,9 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.time.Duration;
import java.time.Instant;
@ -20,13 +17,13 @@ class DefaultDelayQueueTest {
Instant now = Instant.now();
DefaultDelayQueue<Request> queue;
DefaultQueue<Request> queue;
final Duration minDelay = Duration.ofMillis(1);
@BeforeEach
void setUp() {
queue = new DefaultDelayQueue<>(() -> now, minDelay);
queue = new DefaultQueue<>(() -> now, minDelay);
}
@Test
@ -82,23 +79,23 @@ class DefaultDelayQueueTest {
@Test
void shouldNotAddRepeatedlyIfNotDone() throws InterruptedException {
var entrySpy = spy(new DelayedEntry<>(newRequest("fake-name"), minDelay, () -> now));
queue = new DefaultQueue<>(() -> now, Duration.ZERO);
var fakeEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ZERO,
() -> this.now);
doReturn(0L).when(entrySpy).getDelay(any());
queue.add(entrySpy);
queue.add(fakeEntry);
assertEquals(1, queue.size());
assertEquals(entrySpy, queue.peek());
assertEquals(fakeEntry, queue.peek());
queue.take();
assertEquals(0, queue.size());
queue.add(entrySpy);
queue.add(fakeEntry);
assertEquals(0, queue.size());
queue.done(newRequest("fake-name"));
queue.add(entrySpy);
queue.add(fakeEntry);
assertEquals(1, queue.size());
assertEquals(entrySpy, queue.peek());
assertEquals(fakeEntry, queue.peek());
}
Request newRequest(String name) {