refactor: add retry operation to single page publishing (#3422)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.3.x
#### What this PR does / why we need it:
修复初始化时自定义页面会发布失败的问题

#### Which issue(s) this PR fixes:
Fixes #3279
#### Does this PR introduce a user-facing change?
```release-note
None
```
pull/3420/head
guqing 2023-02-28 23:30:18 +08:00 committed by GitHub
parent 3146589d25
commit aba151f54c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 173 additions and 24 deletions

View File

@ -214,19 +214,20 @@ public class PostEndpoint implements CustomEndpoint {
boolean asyncPublish = request.queryParam("async") boolean asyncPublish = request.queryParam("async")
.map(Boolean::parseBoolean) .map(Boolean::parseBoolean)
.orElse(false); .orElse(false);
return client.get(Post.class, name) return Mono.defer(() -> client.get(Post.class, name)
.doOnNext(post -> { .doOnNext(post -> {
var spec = post.getSpec(); var spec = post.getSpec();
request.queryParam("headSnapshot").ifPresent(spec::setHeadSnapshot); request.queryParam("headSnapshot").ifPresent(spec::setHeadSnapshot);
spec.setPublish(true); spec.setPublish(true);
if (spec.getHeadSnapshot() == null) { if (spec.getHeadSnapshot() == null) {
spec.setHeadSnapshot(spec.getBaseSnapshot()); spec.setHeadSnapshot(spec.getBaseSnapshot());
} }
// TODO Provide release snapshot query param to control // TODO Provide release snapshot query param to control
spec.setReleaseSnapshot(spec.getHeadSnapshot()); spec.setReleaseSnapshot(spec.getHeadSnapshot());
}) })
.flatMap(client::update) .flatMap(client::update)
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)) )
.retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(t -> t instanceof OptimisticLockingFailureException)) .filter(t -> t instanceof OptimisticLockingFailureException))
.flatMap(post -> { .flatMap(post -> {
if (asyncPublish) { if (asyncPublish) {
@ -243,7 +244,7 @@ public class PostEndpoint implements CustomEndpoint {
} }
throw new RetryException("Post publishing status is not as expected"); throw new RetryException("Post publishing status is not as expected");
}) })
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(100)) .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(200))
.filter(t -> t instanceof RetryException)) .filter(t -> t instanceof RetryException))
.doOnError(IllegalStateException.class, err -> { .doOnError(IllegalStateException.class, err -> {
log.error("Failed to publish post [{}]", name, err); log.error("Failed to publish post [{}]", name, err);

View File

@ -11,6 +11,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springdoc.core.fn.builders.schema.Builder; import org.springdoc.core.fn.builders.schema.Builder;
import org.springdoc.webflux.core.fn.SpringdocRouteBuilder; import org.springdoc.webflux.core.fn.SpringdocRouteBuilder;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.retry.RetryException; import org.springframework.retry.RetryException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -189,16 +190,19 @@ public class SinglePageEndpoint implements CustomEndpoint {
boolean asyncPublish = request.queryParam("async") boolean asyncPublish = request.queryParam("async")
.map(Boolean::parseBoolean) .map(Boolean::parseBoolean)
.orElse(false); .orElse(false);
return client.fetch(SinglePage.class, name) return Mono.defer(() -> client.get(SinglePage.class, name)
.flatMap(singlePage -> { .flatMap(singlePage -> {
SinglePage.SinglePageSpec spec = singlePage.getSpec(); SinglePage.SinglePageSpec spec = singlePage.getSpec();
spec.setPublish(true); spec.setPublish(true);
if (spec.getHeadSnapshot() == null) { if (spec.getHeadSnapshot() == null) {
spec.setHeadSnapshot(spec.getBaseSnapshot()); spec.setHeadSnapshot(spec.getBaseSnapshot());
} }
spec.setReleaseSnapshot(spec.getHeadSnapshot()); spec.setReleaseSnapshot(spec.getHeadSnapshot());
return client.update(singlePage); return client.update(singlePage);
}) })
)
.retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(t -> t instanceof OptimisticLockingFailureException))
.flatMap(post -> { .flatMap(post -> {
if (asyncPublish) { if (asyncPublish) {
return Mono.just(post); return Mono.just(post);

View File

@ -2,6 +2,9 @@ package run.halo.app.core.extension.endpoint;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -11,12 +14,14 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import run.halo.app.content.PostRequest; import run.halo.app.content.PostRequest;
import run.halo.app.content.PostService; import run.halo.app.content.PostService;
import run.halo.app.content.TestPost; import run.halo.app.content.TestPost;
import run.halo.app.core.extension.content.Post; import run.halo.app.core.extension.content.Post;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.ReactiveExtensionClient;
/** /**
@ -75,6 +80,53 @@ class PostEndpointTest {
.value(post -> assertThat(post).isEqualTo(TestPost.postV1())); .value(post -> assertThat(post).isEqualTo(TestPost.postV1()));
} }
@Test
void publishRetryOnOptimisticLockingFailure() {
var post = new Post();
post.setMetadata(new Metadata());
post.getMetadata().setName("post-1");
post.setSpec(new Post.PostSpec());
when(client.get(eq(Post.class), eq("post-1"))).thenReturn(Mono.just(post));
when(client.update(any(Post.class)))
.thenReturn(Mono.error(new OptimisticLockingFailureException("fake-error")));
// Send request
webTestClient.put()
.uri("/posts/{name}/publish?async=false", "post-1")
.exchange()
.expectStatus()
.is5xxServerError();
// Verify WebClient retry behavior
verify(client, times(6)).get(eq(Post.class), eq("post-1"));
verify(client, times(6)).update(any(Post.class));
}
@Test
void publishSuccess() {
var post = new Post();
post.setMetadata(new Metadata());
post.getMetadata().setName("post-1");
post.setSpec(new Post.PostSpec());
when(client.get(eq(Post.class), eq("post-1"))).thenReturn(Mono.just(post));
when(client.fetch(eq(Post.class), eq("post-1"))).thenReturn(Mono.empty());
when(client.update(any(Post.class)))
.thenReturn(Mono.just(post));
// Send request
webTestClient.put()
.uri("/posts/{name}/publish?async=false", "post-1")
.exchange()
.expectStatus()
.is2xxSuccessful();
// Verify WebClient retry behavior
verify(client, times(1)).get(eq(Post.class), eq("post-1"));
verify(client, times(1)).update(any(Post.class));
}
PostRequest postRequest(Post post) { PostRequest postRequest(Post post) {
return new PostRequest(post, new PostRequest.Content("B", "<p>B</p>", "MARKDOWN")); return new PostRequest(post, new PostRequest.Content("B", "<p>B</p>", "MARKDOWN"));
} }

View File

@ -0,0 +1,92 @@
package run.halo.app.core.extension.endpoint;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.content.SinglePage;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.ReactiveExtensionClient;
/**
* Tests for @{@link SinglePageEndpoint}.
*
* @author guqing
* @since 2.3.0
*/
@ExtendWith(MockitoExtension.class)
class SinglePageEndpointTest {
@Mock
private ReactiveExtensionClient client;
@InjectMocks
SinglePageEndpoint singlePageEndpoint;
WebTestClient webTestClient;
@BeforeEach
void setUp() {
webTestClient = WebTestClient
.bindToRouterFunction(singlePageEndpoint.endpoint())
.build();
}
@Test
void publishRetryOnOptimisticLockingFailure() {
var page = new SinglePage();
page.setMetadata(new Metadata());
page.getMetadata().setName("page-1");
page.setSpec(new SinglePage.SinglePageSpec());
when(client.get(eq(SinglePage.class), eq("page-1"))).thenReturn(Mono.just(page));
when(client.update(any(SinglePage.class)))
.thenReturn(Mono.error(new OptimisticLockingFailureException("fake-error")));
// Send request
webTestClient.put()
.uri("/singlepages/{name}/publish?async=false", "page-1")
.exchange()
.expectStatus()
.is5xxServerError();
// Verify WebClient retry behavior
verify(client, times(6)).get(eq(SinglePage.class), eq("page-1"));
verify(client, times(6)).update(any(SinglePage.class));
}
@Test
void publishSuccess() {
var page = new SinglePage();
page.setMetadata(new Metadata());
page.getMetadata().setName("page-1");
page.setSpec(new SinglePage.SinglePageSpec());
when(client.get(eq(SinglePage.class), eq("page-1"))).thenReturn(Mono.just(page));
when(client.fetch(eq(SinglePage.class), eq("page-1"))).thenReturn(Mono.empty());
when(client.update(any(SinglePage.class))).thenReturn(Mono.just(page));
// Send request
webTestClient.put()
.uri("/singlepages/{name}/publish?async=false", "page-1")
.exchange()
.expectStatus()
.is2xxSuccessful();
// Verify WebClient retry behavior
verify(client, times(1)).get(eq(SinglePage.class), eq("page-1"));
verify(client, times(1)).update(any(SinglePage.class));
}
}