Fix the problem of deleting failed plugins for a long time (#4002)

#### What type of PR is this?

/kind bug
/area core
/milestone 2.6.x

#### What this PR does / why we need it:

This PR fixes the problem of deleting failed plugins for a long time by replacing older delayed entry in reconciler queue.

#### Does this PR introduce a user-facing change?

```release-note
修复长时间删除失败的插件问题
```
pull/4007/head
John Niang 2023-05-29 17:10:56 +08:00 committed by GitHub
parent e95caa2c21
commit 937d48b839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 1 deletions

View File

@ -3,6 +3,7 @@ package run.halo.app.extension.controller;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.locks.Lock;
@ -61,7 +62,15 @@ public class DefaultQueue<R> implements RequestQueue<R> {
entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier);
}
if (dirty.contains(entry.getEntry())) {
return false;
var oldEntry = findOldEntry(entry);
if (oldEntry.isEmpty()) {
return false;
}
var oldReadyAt = oldEntry.get().getReadyAt();
var readyAt = entry.getReadyAt();
if (!readyAt.isBefore(oldReadyAt)) {
return false;
}
}
dirty.add(entry.getEntry());
if (processing.contains(entry.getEntry())) {
@ -138,4 +147,13 @@ public class DefaultQueue<R> implements RequestQueue<R> {
return this.disposed;
}
private Optional<DelayedEntry<R>> findOldEntry(DelayedEntry<R> entry) {
for (DelayedEntry<R> element : queue) {
if (element.equals(entry)) {
return Optional.of(element);
}
}
return Optional.empty();
}
}

View File

@ -56,6 +56,10 @@ public interface RequestQueue<E> extends Disposable {
return retryAfter;
}
public Instant getReadyAt() {
return readyAt;
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));

View File

@ -98,6 +98,38 @@ class DefaultDelayQueueTest {
assertEquals(fakeEntry, queue.peek());
}
@Test
void shouldNotAddIfHavingEarlierEntryInQueue() {
queue = new DefaultQueue<>(() -> now, Duration.ZERO);
var fakeEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ZERO,
() -> this.now);
assertTrue(queue.add(fakeEntry));
assertEquals(1, queue.size());
assertEquals(fakeEntry, queue.peek());
assertFalse(queue.add(fakeEntry));
var laterEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ofMillis(100),
() -> this.now);
assertFalse(queue.add(laterEntry));
}
@Test
void shouldAddIfHavingLaterEntryInQueue() {
queue = new DefaultQueue<>(() -> now, Duration.ZERO);
var fakeEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ofMillis(100),
() -> this.now);
assertTrue(queue.add(fakeEntry));
assertEquals(1, queue.size());
assertEquals(fakeEntry, queue.peek());
assertFalse(queue.add(fakeEntry));
var laterEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ofMillis(99),
() -> this.now);
assertTrue(queue.add(laterEntry));
}
Request newRequest(String name) {
return new Request(name);
}

View File

@ -18,6 +18,7 @@ class DelayedEntryTest {
var delayedEntry = new DelayedEntry<>("fake", Duration.ofMillis(100), () -> now);
assertEquals(100, delayedEntry.getDelay(TimeUnit.MILLISECONDS));
assertEquals(Duration.ofMillis(100), delayedEntry.getRetryAfter());
assertEquals(now.plusMillis(100), delayedEntry.getReadyAt());
assertEquals("fake", delayedEntry.getEntry());
delayedEntry = new DelayedEntry<>("fake", now.plus(Duration.ofSeconds(1)), () -> now);