[release-2.3] Fix the problem of failure to create and publish post (#3488)

This is an automated cherry-pick of #3441

/assign JohnNiang

```release-note
解决文章创建和发布经常失败的问题
```
release-2.3 v2.3.1
Halo Dev Bot 2023-03-08 17:50:12 +08:00 committed by GitHub
parent eb87116cf7
commit ecf65d2214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 134 additions and 89 deletions

View File

@ -268,25 +268,25 @@ public class PostServiceImpl extends AbstractContentService implements PostServi
private Mono<Post> waitForPostToDraftConcludingWork(String postName, private Mono<Post> waitForPostToDraftConcludingWork(String postName,
ContentWrapper contentWrapper) { ContentWrapper contentWrapper) {
return client.fetch(Post.class, postName) return Mono.defer(() -> client.fetch(Post.class, postName)
.flatMap(post -> { .flatMap(post -> {
post.getSpec().setBaseSnapshot(contentWrapper.getSnapshotName()); post.getSpec().setBaseSnapshot(contentWrapper.getSnapshotName());
post.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName()); post.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName());
if (Objects.equals(true, post.getSpec().getPublish())) { if (Objects.equals(true, post.getSpec().getPublish())) {
post.getSpec().setReleaseSnapshot(post.getSpec().getHeadSnapshot()); post.getSpec().setReleaseSnapshot(post.getSpec().getHeadSnapshot());
} }
Condition condition = Condition.builder() Condition condition = Condition.builder()
.type(Post.PostPhase.DRAFT.name()) .type(Post.PostPhase.DRAFT.name())
.reason("DraftedSuccessfully") .reason("DraftedSuccessfully")
.message("Drafted post successfully.") .message("Drafted post successfully.")
.status(ConditionStatus.TRUE) .status(ConditionStatus.TRUE)
.lastTransitionTime(Instant.now()) .lastTransitionTime(Instant.now())
.build(); .build();
Post.PostStatus status = post.getStatusOrDefault(); Post.PostStatus status = post.getStatusOrDefault();
status.setPhase(Post.PostPhase.DRAFT.name()); status.setPhase(Post.PostPhase.DRAFT.name());
status.getConditionsOrDefault().addAndEvictFIFO(condition); status.getConditionsOrDefault().addAndEvictFIFO(condition);
return client.update(post); return client.update(post);
}) }))
.retryWhen(Retry.backoff(5, Duration.ofMillis(100)) .retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance)); .filter(OptimisticLockingFailureException.class::isInstance));
} }
@ -306,11 +306,11 @@ public class PostServiceImpl extends AbstractContentService implements PostServi
return client.update(post); return client.update(post);
}); });
} }
return updateContent(baseSnapshot, postRequest.contentRequest()) return Mono.defer(() -> updateContent(baseSnapshot, postRequest.contentRequest())
.flatMap(contentWrapper -> { .flatMap(contentWrapper -> {
post.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName()); post.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName());
return client.update(post); return client.update(post);
}) }))
.retryWhen(Retry.backoff(5, Duration.ofMillis(100)) .retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(throwable -> throwable instanceof OptimisticLockingFailureException)); .filter(throwable -> throwable instanceof OptimisticLockingFailureException));
} }

View File

@ -128,25 +128,25 @@ public class SinglePageServiceImpl extends AbstractContentService implements Sin
private Mono<SinglePage> waitForPageToDraftConcludingWork(String pageName, private Mono<SinglePage> waitForPageToDraftConcludingWork(String pageName,
ContentWrapper contentWrapper) { ContentWrapper contentWrapper) {
return client.fetch(SinglePage.class, pageName) return Mono.defer(() -> client.fetch(SinglePage.class, pageName)
.flatMap(page -> { .flatMap(page -> {
page.getSpec().setBaseSnapshot(contentWrapper.getSnapshotName()); page.getSpec().setBaseSnapshot(contentWrapper.getSnapshotName());
page.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName()); page.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName());
if (Objects.equals(true, page.getSpec().getPublish())) { if (Objects.equals(true, page.getSpec().getPublish())) {
page.getSpec().setReleaseSnapshot(page.getSpec().getHeadSnapshot()); page.getSpec().setReleaseSnapshot(page.getSpec().getHeadSnapshot());
} }
Condition condition = Condition.builder() Condition condition = Condition.builder()
.type(Post.PostPhase.DRAFT.name()) .type(Post.PostPhase.DRAFT.name())
.reason("DraftedSuccessfully") .reason("DraftedSuccessfully")
.message("Drafted page successfully") .message("Drafted page successfully")
.status(ConditionStatus.TRUE) .status(ConditionStatus.TRUE)
.lastTransitionTime(Instant.now()) .lastTransitionTime(Instant.now())
.build(); .build();
SinglePage.SinglePageStatus status = page.getStatusOrDefault(); SinglePage.SinglePageStatus status = page.getStatusOrDefault();
status.getConditionsOrDefault().addAndEvictFIFO(condition); status.getConditionsOrDefault().addAndEvictFIFO(condition);
status.setPhase(Post.PostPhase.DRAFT.name()); status.setPhase(Post.PostPhase.DRAFT.name());
return client.update(page); return client.update(page);
}) }))
.retryWhen(Retry.backoff(5, Duration.ofMillis(100)) .retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance) .filter(OptimisticLockingFailureException.class::isInstance)
); );
@ -167,11 +167,11 @@ public class SinglePageServiceImpl extends AbstractContentService implements Sin
return client.update(page); return client.update(page);
}); });
} }
return updateContent(baseSnapshot, pageRequest.contentRequest()) return Mono.defer(() -> updateContent(baseSnapshot, pageRequest.contentRequest())
.flatMap(contentWrapper -> { .flatMap(contentWrapper -> {
page.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName()); page.getSpec().setHeadSnapshot(contentWrapper.getSnapshotName());
return client.update(page); return client.update(page);
}) }))
.retryWhen(Retry.backoff(5, Duration.ofMillis(100)) .retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(throwable -> throwable instanceof OptimisticLockingFailureException)); .filter(throwable -> throwable instanceof OptimisticLockingFailureException));
} }

View File

@ -7,6 +7,7 @@ import static org.springdoc.core.fn.builders.requestbody.Builder.requestBodyBuil
import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.enums.ParameterIn;
import java.time.Duration; import java.time.Duration;
import java.util.Objects;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springdoc.core.fn.builders.schema.Builder; import org.springdoc.core.fn.builders.schema.Builder;
@ -19,7 +20,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.reactive.function.server.ServerResponse;
import org.thymeleaf.util.StringUtils; import org.springframework.web.server.ServerErrorException;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.retry.Retry; import reactor.util.retry.Retry;
import run.halo.app.content.ContentWrapper; import run.halo.app.content.ContentWrapper;
@ -229,39 +231,35 @@ public class PostEndpoint implements CustomEndpoint {
) )
.retryWhen(Retry.backoff(5, Duration.ofMillis(100)) .retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(t -> t instanceof OptimisticLockingFailureException)) .filter(t -> t instanceof OptimisticLockingFailureException))
.flatMap(post -> { .filter(post -> asyncPublish)
if (asyncPublish) { .switchIfEmpty(Mono.defer(() -> awaitPostPublished(name)))
return Mono.just(post); .onErrorMap(Exceptions::isRetryExhausted, err -> new ServerErrorException(
} "Post publishing failed, please try again later.", err))
return client.fetch(Post.class, name)
.map(latest -> {
String latestReleasedSnapshotName =
ExtensionUtil.nullSafeAnnotations(latest)
.get(Post.LAST_RELEASED_SNAPSHOT_ANNO);
if (StringUtils.equals(latestReleasedSnapshotName,
latest.getSpec().getReleaseSnapshot())) {
return latest;
}
throw new RetryException("Post publishing status is not as expected");
})
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(200))
.filter(t -> t instanceof RetryException))
.doOnError(IllegalStateException.class, err -> {
log.error("Failed to publish post [{}]", name, err);
throw new IllegalStateException("Publishing wait timeout.");
});
})
.flatMap(publishResult -> ServerResponse.ok().bodyValue(publishResult)); .flatMap(publishResult -> ServerResponse.ok().bodyValue(publishResult));
} }
private Mono<Post> awaitPostPublished(String postName) {
return Mono.defer(() -> client.get(Post.class, postName)
.filter(post -> {
var releasedSnapshot = ExtensionUtil.nullSafeAnnotations(post)
.get(Post.LAST_RELEASED_SNAPSHOT_ANNO);
var expectReleaseSnapshot = post.getSpec().getReleaseSnapshot();
return Objects.equals(releasedSnapshot, expectReleaseSnapshot);
})
.switchIfEmpty(Mono.error(
() -> new RetryException("Retry to check post publish status"))))
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(200))
.filter(t -> t instanceof RetryException));
}
private Mono<ServerResponse> unpublishPost(ServerRequest request) { private Mono<ServerResponse> unpublishPost(ServerRequest request) {
var name = request.pathVariable("name"); var name = request.pathVariable("name");
return client.get(Post.class, name) return Mono.defer(() -> client.get(Post.class, name)
.doOnNext(post -> { .doOnNext(post -> {
var spec = post.getSpec(); var spec = post.getSpec();
spec.setPublish(false); spec.setPublish(false);
}) })
.flatMap(client::update) .flatMap(client::update))
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)) .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.filter(t -> t instanceof OptimisticLockingFailureException)) .filter(t -> t instanceof OptimisticLockingFailureException))
// TODO Fire unpublished event in reconciler in the future // TODO Fire unpublished event in reconciler in the future
@ -272,12 +270,12 @@ public class PostEndpoint implements CustomEndpoint {
private Mono<ServerResponse> recyclePost(ServerRequest request) { private Mono<ServerResponse> recyclePost(ServerRequest request) {
var name = request.pathVariable("name"); var name = request.pathVariable("name");
return client.get(Post.class, name) return Mono.defer(() -> client.get(Post.class, name)
.doOnNext(post -> { .doOnNext(post -> {
var spec = post.getSpec(); var spec = post.getSpec();
spec.setDeleted(true); spec.setDeleted(true);
}) })
.flatMap(client::update) .flatMap(client::update))
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)) .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.filter(t -> t instanceof OptimisticLockingFailureException)) .filter(t -> t instanceof OptimisticLockingFailureException))
// TODO Fire recycled event in reconciler in the future // TODO Fire recycled event in reconciler in the future

View File

@ -7,6 +7,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -21,6 +22,7 @@ import run.halo.app.content.PostRequest;
import run.halo.app.content.PostService; import run.halo.app.content.PostService;
import run.halo.app.content.TestPost; import run.halo.app.content.TestPost;
import run.halo.app.core.extension.content.Post; import run.halo.app.core.extension.content.Post;
import run.halo.app.core.extension.content.Post.PostSpec;
import run.halo.app.extension.Metadata; import run.halo.app.extension.Metadata;
import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.ReactiveExtensionClient;
@ -85,7 +87,7 @@ class PostEndpointTest {
var post = new Post(); var post = new Post();
post.setMetadata(new Metadata()); post.setMetadata(new Metadata());
post.getMetadata().setName("post-1"); post.getMetadata().setName("post-1");
post.setSpec(new Post.PostSpec()); post.setSpec(new PostSpec());
when(client.get(eq(Post.class), eq("post-1"))).thenReturn(Mono.just(post)); when(client.get(eq(Post.class), eq("post-1"))).thenReturn(Mono.just(post));
when(client.update(any(Post.class))) when(client.update(any(Post.class)))
@ -108,9 +110,19 @@ class PostEndpointTest {
var post = new Post(); var post = new Post();
post.setMetadata(new Metadata()); post.setMetadata(new Metadata());
post.getMetadata().setName("post-1"); post.getMetadata().setName("post-1");
post.setSpec(new Post.PostSpec()); post.setSpec(new PostSpec());
when(client.get(eq(Post.class), eq("post-1"))).thenReturn(Mono.just(post));
when(client.fetch(eq(Post.class), eq("post-1"))).thenReturn(Mono.empty()); var publishedPost = new Post();
var publishedMetadata = new Metadata();
publishedMetadata.setAnnotations(Map.of(Post.LAST_RELEASED_SNAPSHOT_ANNO, "my-release"));
publishedPost.setMetadata(publishedMetadata);
var publishedPostSpec = new PostSpec();
publishedPostSpec.setReleaseSnapshot("my-release");
publishedPost.setSpec(publishedPostSpec);
when(client.get(eq(Post.class), eq("post-1")))
.thenReturn(Mono.just(post))
.thenReturn(Mono.just(publishedPost));
when(client.update(any(Post.class))) when(client.update(any(Post.class)))
.thenReturn(Mono.just(post)); .thenReturn(Mono.just(post));
@ -123,8 +135,43 @@ class PostEndpointTest {
.is2xxSuccessful(); .is2xxSuccessful();
// Verify WebClient retry behavior // Verify WebClient retry behavior
verify(client, times(1)).get(eq(Post.class), eq("post-1")); verify(client, times(2)).get(eq(Post.class), eq("post-1"));
verify(client, times(1)).update(any(Post.class)); verify(client).update(any(Post.class));
}
@Test
void shouldFailIfWaitTimeoutForPublishedStatus() {
var post = new Post();
post.setMetadata(new Metadata());
post.getMetadata().setName("post-1");
post.setSpec(new PostSpec());
var publishedPost = new Post();
var publishedMetadata = new Metadata();
publishedMetadata.setAnnotations(
Map.of(Post.LAST_RELEASED_SNAPSHOT_ANNO, "old-my-release"));
publishedPost.setMetadata(publishedMetadata);
var publishedPostSpec = new PostSpec();
publishedPostSpec.setReleaseSnapshot("my-release");
publishedPost.setSpec(publishedPostSpec);
when(client.get(eq(Post.class), eq("post-1")))
.thenReturn(Mono.just(post))
.thenReturn(Mono.just(publishedPost));
when(client.update(any(Post.class)))
.thenReturn(Mono.just(post));
// Send request
webTestClient.put()
.uri("/posts/{name}/publish?async=false", "post-1")
.exchange()
.expectStatus()
.is5xxServerError();
// Verify WebClient retry behavior
verify(client, times(12)).get(eq(Post.class), eq("post-1"));
verify(client).update(any(Post.class));
} }
PostRequest postRequest(Post post) { PostRequest postRequest(Post post) {