refactor: optimize comment and reply deletion (#5777)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.15.x

#### What this PR does / why we need it:
优化评论和回复删除,只有删除第一页后才会再次查询避免数据堆积

#### Does this PR introduce a user-facing change?
```release-note
None
```
pull/5787/head
guqing 2024-04-26 17:49:26 +08:00 committed by GitHub
parent ee76f19572
commit c0de807b9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 365 additions and 16 deletions

View File

@ -4,10 +4,12 @@ import static run.halo.app.extension.index.query.QueryFactory.and;
import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.function.Function;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Sort;
import org.springframework.lang.NonNull;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
@ -15,6 +17,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.core.extension.service.RoleService;
@ -147,17 +150,33 @@ public class CommentServiceImpl implements CommentService {
@Override
public Mono<Void> removeBySubject(@NonNull Ref subjectRef) {
Assert.notNull(subjectRef, "The subjectRef must not be null.");
return cleanupComments(subjectRef, 200);
}
private Mono<Void> cleanupComments(Ref subjectRef, int batchSize) {
// ascending order by creation time and name
var pageRequest = PageRequestImpl.of(1, 200,
final var pageRequest = PageRequestImpl.of(1, batchSize,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> listCommentsByRef(subjectRef, pageRequest))
.expand(page -> page.hasNext()
? listCommentsByRef(subjectRef, pageRequest.next())
: Mono.empty()
// forever loop first page until no more to delete
return listCommentsByRef(subjectRef, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(this::deleteWithRetry)
.then(page.hasNext() ? cleanupComments(subjectRef, batchSize) : Mono.empty())
);
}
private Mono<Comment> deleteWithRetry(Comment item) {
return client.delete(item)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete(item.getMetadata().getName()));
}
private Mono<Comment> attemptToDelete(String name) {
return Mono.defer(() -> client.fetch(Comment.class, name)
.flatMap(client::delete)
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}
Mono<ListResult<Comment>> listCommentsByRef(Ref subjectRef, PageRequest pageRequest) {

View File

@ -5,18 +5,21 @@ import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import static run.halo.app.extension.router.selector.SelectorUtil.labelAndFieldSelectorToPredicate;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Sort;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.core.extension.content.Reply;
@ -118,17 +121,33 @@ public class ReplyServiceImpl implements ReplyService {
@Override
public Mono<Void> removeAllByComment(String commentName) {
Assert.notNull(commentName, "The commentName must not be null.");
return cleanupComments(commentName, 200);
}
private Mono<Void> cleanupComments(String commentName, int batchSize) {
// ascending order by creation time and name
var pageRequest = PageRequestImpl.of(1, 200,
final var pageRequest = PageRequestImpl.of(1, batchSize,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> listRepliesByComment(commentName, pageRequest))
.expand(page -> page.hasNext()
? listRepliesByComment(commentName, pageRequest.next())
: Mono.empty()
// forever loop first page until no more to delete
return listRepliesByComment(commentName, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(this::deleteWithRetry)
.then(page.hasNext() ? cleanupComments(commentName, batchSize) : Mono.empty())
);
}
private Mono<Reply> deleteWithRetry(Reply item) {
return client.delete(item)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete(item.getMetadata().getName()));
}
private Mono<Reply> attemptToDelete(String name) {
return Mono.defer(() -> client.fetch(Reply.class, name)
.flatMap(client::delete)
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}
Mono<ListResult<Reply>> listRepliesByComment(String commentName, PageRequest pageRequest) {

View File

@ -0,0 +1,159 @@
package run.halo.app.content.comment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ExtensionStoreUtil;
import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.Ref;
import run.halo.app.extension.SchemeManager;
import run.halo.app.extension.index.IndexerFactory;
import run.halo.app.extension.store.ReactiveExtensionStoreClient;
import run.halo.app.infra.utils.JsonUtils;
/**
* Integration tests for {@link CommentServiceImpl}.
*
* @author guqing
* @since 2.15.0
*/
class CommentServiceImplIntegrationTest {
@Nested
@DirtiesContext
@SpringBootTest
class CommentRemoveTest {
private final List<Comment> storedComments = createComments(350);
@Autowired
private SchemeManager schemeManager;
@SpyBean
private ReactiveExtensionClient reactiveClient;
@Autowired
private ReactiveExtensionStoreClient storeClient;
@Autowired
private IndexerFactory indexerFactory;
@SpyBean
private CommentServiceImpl commentService;
Mono<Extension> deleteImmediately(Extension extension) {
var name = extension.getMetadata().getName();
var scheme = schemeManager.get(extension.getClass());
// un-index
var indexer = indexerFactory.getIndexer(extension.groupVersionKind());
indexer.unIndexRecord(extension.getMetadata().getName());
// delete from db
var storeName = ExtensionStoreUtil.buildStoreName(scheme, name);
return storeClient.delete(storeName, extension.getMetadata().getVersion())
.thenReturn(extension);
}
@BeforeEach
void setUp() {
Flux.fromIterable(storedComments)
.flatMap(post -> reactiveClient.create(post))
.as(StepVerifier::create)
.expectNextCount(storedComments.size())
.verifyComplete();
}
@AfterEach
void tearDown() {
Flux.fromIterable(storedComments)
.flatMap(this::deleteImmediately)
.as(StepVerifier::create)
.expectNextCount(storedComments.size())
.verifyComplete();
}
@Test
void commentBatchDeletionTest() {
Ref ref = Ref.of("67",
GroupVersionKind.fromAPIVersionAndKind("content.halo.run/v1alpha1", "SinglePage"));
commentService.removeBySubject(ref)
.as(StepVerifier::create)
.verifyComplete();
verify(reactiveClient, times(storedComments.size())).delete(any(Comment.class));
verify(commentService, times(2)).listCommentsByRef(eq(ref), any());
commentService.listCommentsByRef(ref, PageRequestImpl.ofSize(1))
.as(StepVerifier::create)
.consumeNextWith(result -> {
assertThat(result.getTotal()).isEqualTo(0);
})
.verifyComplete();
}
List<Comment> createComments(int size) {
List<Comment> comments = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
var comment = createComment();
comment.getMetadata().setName("comment-" + i);
comments.add(comment);
}
return comments;
}
}
Comment createComment() {
return JsonUtils.jsonToObject("""
{
"spec": {
"raw": "fake-raw",
"content": "fake-content",
"owner": {
"kind": "User",
"name": "fake-user"
},
"userAgent": "",
"ipAddress": "",
"approvedTime": "2024-02-28T09:15:16.095Z",
"creationTime": "2024-02-28T06:23:42.923294424Z",
"priority": 0,
"top": false,
"allowNotification": false,
"approved": true,
"hidden": false,
"subjectRef": {
"group": "content.halo.run",
"version": "v1alpha1",
"kind": "SinglePage",
"name": "67"
},
"lastReadTime": "2024-02-29T03:39:04.230Z"
},
"apiVersion": "content.halo.run/v1alpha1",
"kind": "Comment",
"metadata": {
"generateName": "comment-"
}
}
""", Comment.class);
}
}

View File

@ -0,0 +1,152 @@
package run.halo.app.content.comment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.content.Reply;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ExtensionStoreUtil;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.SchemeManager;
import run.halo.app.extension.index.IndexerFactory;
import run.halo.app.extension.store.ReactiveExtensionStoreClient;
import run.halo.app.infra.utils.JsonUtils;
/**
* Integration tests for {@link ReplyServiceImpl}.
*
* @author guqing
* @since 2.15.0
*/
class ReplyServiceImplIntegrationTest {
@Nested
@DirtiesContext
@SpringBootTest
class ReplyRemoveTest {
private final List<Reply> storedReplies = createReplies(320);
private List<Reply> createReplies(int size) {
List<Reply> replies = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
var reply = JsonUtils.jsonToObject(fakeReplyJson(), Reply.class);
reply.getMetadata().setName("reply-" + i);
replies.add(reply);
}
return replies;
}
@Autowired
private SchemeManager schemeManager;
@SpyBean
private ReactiveExtensionClient reactiveClient;
@Autowired
private ReactiveExtensionStoreClient storeClient;
@Autowired
private IndexerFactory indexerFactory;
@SpyBean
private ReplyServiceImpl replyService;
Mono<Extension> deleteImmediately(Extension extension) {
var name = extension.getMetadata().getName();
var scheme = schemeManager.get(extension.getClass());
// un-index
var indexer = indexerFactory.getIndexer(extension.groupVersionKind());
indexer.unIndexRecord(extension.getMetadata().getName());
// delete from db
var storeName = ExtensionStoreUtil.buildStoreName(scheme, name);
return storeClient.delete(storeName, extension.getMetadata().getVersion())
.thenReturn(extension);
}
@BeforeEach
void setUp() {
Flux.fromIterable(storedReplies)
.flatMap(post -> reactiveClient.create(post))
.as(StepVerifier::create)
.expectNextCount(storedReplies.size())
.verifyComplete();
}
@AfterEach
void tearDown() {
Flux.fromIterable(storedReplies)
.flatMap(this::deleteImmediately)
.as(StepVerifier::create)
.expectNextCount(storedReplies.size())
.verifyComplete();
}
@Test
void removeAllByComment() {
String commentName = "fake-comment";
replyService.removeAllByComment(commentName)
.as(StepVerifier::create)
.verifyComplete();
verify(reactiveClient, times(storedReplies.size())).delete(any(Reply.class));
verify(replyService, times(2)).listRepliesByComment(eq(commentName), any());
replyService.listRepliesByComment(commentName, PageRequestImpl.ofSize(1))
.as(StepVerifier::create)
.consumeNextWith(result -> assertThat(result.getTotal()).isEqualTo(0))
.verifyComplete();
}
}
String fakeReplyJson() {
return """
{
"metadata":{
"name":"fake-reply"
},
"spec":{
"raw":"fake-raw",
"content":"fake-content",
"owner":{
"kind":"User",
"name":"fake-user",
"displayName":"fake-display-name"
},
"creationTime": "2024-03-11T06:23:42.923294424Z",
"ipAddress":"",
"approved": true,
"hidden": false,
"allowNotification": false,
"top": false,
"priority": 0,
"commentName":"fake-comment"
},
"owner":{
"kind":"User",
"displayName":"fake-display-name"
},
"stats":{
"upvote":0
}
}
""";
}
}