From 0e17d53ede66c57b12f2dec4c92874c7e69ed9ef Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Fri, 26 Apr 2024 18:26:41 +0800 Subject: [PATCH] feat: subscription support for expression-based subscribing (#5705) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind feature /area core /milestone 2.15.x #### What this PR does / why we need it: 通知订阅支持基于表达式订阅 see #5632 for more details how to test it? 1. 测试系统通知功能的文章、页面有新评论通知和评论有新回复通知的功能是否正常 2. 测试 2.14 创建的文章、评论和回复升级到此版本后是否能继续收到相应通知,如文章有新评论 #### Which issue(s) this PR fixes: Fixes #5632 #### Does this PR introduce a user-facing change? ```release-note 通知订阅支持基于表达式订阅避免订阅随数据量增长同时自动优化之前的订阅数据 ``` --- api-docs/openapi/v3_0/aggregated.json | 4 + .../extension/notification/Subscription.java | 45 +++- .../CommentNotificationReasonPublisher.java | 31 +-- .../ReplyNotificationSubscriptionHelper.java | 45 +--- .../reconciler/CommentReconciler.java | 3 +- .../extension/reconciler/PostReconciler.java | 7 +- .../extension/reconciler/ReplyReconciler.java | 3 +- .../reconciler/SinglePageReconciler.java | 7 +- .../ReactiveExtensionPaginatedOperator.java | 37 ++++ ...eactiveExtensionPaginatedOperatorImpl.java | 132 +++++++++++ .../run/halo/app/infra/SchemeInitializer.java | 5 + .../DefaultNotificationCenter.java | 208 ++++-------------- .../DefaultSubscriberEmailResolver.java | 23 +- .../app/notification/RecipientResolver.java | 9 + .../notification/RecipientResolverImpl.java | 116 ++++++++++ .../run/halo/app/notification/Subscriber.java | 28 +++ .../notification/SubscriptionMigration.java | 155 +++++++++++++ .../app/notification/SubscriptionService.java | 26 +++ .../notification/SubscriptionServiceImpl.java | 103 +++++++++ .../resources/extensions/notification.yaml | 12 + ...ommentNotificationReasonPublisherTest.java | 15 -- ...plyNotificationSubscriptionHelperTest.java | 67 ++---- .../reconciler/PostReconcilerTest.java | 8 +- .../reconciler/SinglePageReconcilerTest.java | 6 +- .../reconciler/UserReconcilerTest.java | 6 + ...iveExtensionPaginatedOperatorImplTest.java | 107 +++++++++ .../DefaultNotificationCenterTest.java | 198 +++-------------- .../RecipientResolverImplTest.java | 180 +++++++++++++++ .../SubscriptionServiceImplTest.java | 74 +++++++ .../SubscriptionServiceIntegrationTest.java | 171 ++++++++++++++ docs/notification/README.md | 106 +++++---- .../api-client/src/models/interest-reason.ts | 6 + 32 files changed, 1422 insertions(+), 521 deletions(-) create mode 100644 application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperator.java create mode 100644 application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImpl.java create mode 100644 application/src/main/java/run/halo/app/notification/RecipientResolver.java create mode 100644 application/src/main/java/run/halo/app/notification/RecipientResolverImpl.java create mode 100644 application/src/main/java/run/halo/app/notification/Subscriber.java create mode 100644 application/src/main/java/run/halo/app/notification/SubscriptionMigration.java create mode 100644 application/src/main/java/run/halo/app/notification/SubscriptionService.java create mode 100644 application/src/main/java/run/halo/app/notification/SubscriptionServiceImpl.java create mode 100644 application/src/test/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImplTest.java create mode 100644 application/src/test/java/run/halo/app/notification/RecipientResolverImplTest.java create mode 100644 application/src/test/java/run/halo/app/notification/SubscriptionServiceImplTest.java create mode 100644 application/src/test/java/run/halo/app/notification/SubscriptionServiceIntegrationTest.java diff --git a/api-docs/openapi/v3_0/aggregated.json b/api-docs/openapi/v3_0/aggregated.json index 7dd8b1296..1fe3def70 100644 --- a/api-docs/openapi/v3_0/aggregated.json +++ b/api-docs/openapi/v3_0/aggregated.json @@ -15003,6 +15003,10 @@ ], "type": "object", "properties": { + "expression": { + "type": "string", + "description": "The expression to be interested in" + }, "reasonType": { "type": "string", "description": "The name of the reason definition to be interested in" diff --git a/api/src/main/java/run/halo/app/core/extension/notification/Subscription.java b/api/src/main/java/run/halo/app/core/extension/notification/Subscription.java index f2e998ab9..7c8655bd8 100644 --- a/api/src/main/java/run/halo/app/core/extension/notification/Subscription.java +++ b/api/src/main/java/run/halo/app/core/extension/notification/Subscription.java @@ -11,7 +11,6 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import lombok.ToString; import run.halo.app.extension.AbstractExtension; import run.halo.app.extension.GVK; @@ -59,6 +58,44 @@ public class Subscription extends AbstractExtension { @Schema(requiredMode = REQUIRED, description = "The subject name of reason type to be" + " interested in") private ReasonSubject subject; + + @Schema(requiredMode = NOT_REQUIRED, description = "The expression to be interested in") + private String expression; + + /** + *

Since 2.15.0, we have added a new field expression to the + * InterestReason object, so subject can be null.

+ *

In this particular scenario, when the subject is null, we assign it a + * default ReasonSubject object. The properties of this object are set to + * specific values that do not occur in actual applications, thus we can consider this as + * nonexistent data. + * The purpose of this approach is to maintain backward compatibility, even if the + * subject can be null in the new version of the code.

+ */ + public static void ensureSubjectHasValue(InterestReason interestReason) { + if (interestReason.getSubject() == null) { + interestReason.setSubject(createFallbackSubject()); + } + } + + /** + * Check if the given reason subject is a fallback subject. + */ + public static boolean isFallbackSubject(ReasonSubject reasonSubject) { + if (reasonSubject == null) { + return true; + } + var fallback = createFallbackSubject(); + return fallback.getKind().equals(reasonSubject.getKind()) + && fallback.getApiVersion().equals(reasonSubject.getApiVersion()); + } + + static ReasonSubject createFallbackSubject() { + return ReasonSubject.builder() + .apiVersion("notification.halo.run/v1alpha1") + .kind("NonexistentKind") + .build(); + } } @Data @@ -85,10 +122,14 @@ public class Subscription extends AbstractExtension { } @Data - @ToString @Schema(name = "SubscriptionSubscriber") public static class Subscriber { private String name; + + @Override + public String toString() { + return name; + } } /** diff --git a/application/src/main/java/run/halo/app/content/comment/CommentNotificationReasonPublisher.java b/application/src/main/java/run/halo/app/content/comment/CommentNotificationReasonPublisher.java index 07f61c0e4..8af96c597 100644 --- a/application/src/main/java/run/halo/app/content/comment/CommentNotificationReasonPublisher.java +++ b/application/src/main/java/run/halo/app/content/comment/CommentNotificationReasonPublisher.java @@ -1,6 +1,7 @@ package run.halo.app.content.comment; import static org.apache.commons.lang3.StringUtils.defaultIfBlank; +import static run.halo.app.content.comment.ReplyNotificationSubscriptionHelper.identityFrom; import com.fasterxml.jackson.core.type.TypeReference; import java.util.Map; @@ -29,7 +30,6 @@ import run.halo.app.extension.Ref; import run.halo.app.infra.ExternalLinkProcessor; import run.halo.app.infra.utils.JsonUtils; import run.halo.app.notification.NotificationReasonEmitter; -import run.halo.app.notification.UserIdentity; import run.halo.app.plugin.ExtensionComponentsFinder; import run.halo.app.plugin.extensionpoint.ExtensionGetter; @@ -114,6 +114,7 @@ public class CommentNotificationReasonPublisher { builder -> { var attributes = CommentOnPostReasonData.builder() .postName(subjectRef.getName()) + .postOwner(post.getSpec().getOwner()) .postTitle(post.getSpec().getTitle()) .postUrl(postUrl) .commenter(owner.getDisplayName()) @@ -144,8 +145,9 @@ public class CommentNotificationReasonPublisher { } @Builder - record CommentOnPostReasonData(String postName, String postTitle, String postUrl, - String commenter, String content, String commentName) { + record CommentOnPostReasonData(String postName, String postOwner, String postTitle, + String postUrl, String commenter, String content, + String commentName) { } } @@ -180,6 +182,7 @@ public class CommentNotificationReasonPublisher { builder -> { var attributes = CommentOnPageReasonData.builder() .pageName(subjectRef.getName()) + .pageOwner(singlePage.getSpec().getOwner()) .pageTitle(singlePage.getSpec().getTitle()) .pageUrl(pageUrl) .commenter(defaultIfBlank(owner.getDisplayName(), owner.getName())) @@ -210,8 +213,9 @@ public class CommentNotificationReasonPublisher { } @Builder - record CommentOnPageReasonData(String pageName, String pageTitle, String pageUrl, - String commenter, String content, String commentName) { + record CommentOnPageReasonData(String pageName, String pageOwner, String pageTitle, + String pageUrl, String commenter, String content, + String commentName) { } } @@ -224,13 +228,6 @@ public class CommentNotificationReasonPublisher { } } - static UserIdentity identityFrom(Comment.CommentOwner owner) { - if (Comment.CommentOwner.KIND_EMAIL.equals(owner.getKind())) { - return UserIdentity.anonymousWithEmail(owner.getName()); - } - return UserIdentity.of(owner.getName()); - } - @Component @RequiredArgsConstructor static class NewReplyReasonPublisher { @@ -272,6 +269,10 @@ public class CommentNotificationReasonPublisher { .orElse(null); var replyOwner = reply.getSpec().getOwner(); + var repliedOwner = quoteReplyOptional + .map(quoteReply -> quoteReply.getSpec().getOwner()) + .orElseGet(() -> comment.getSpec().getOwner()); + var reasonAttributesBuilder = NewReplyReasonData.builder() .commentContent(comment.getSpec().getContent()) .isQuoteReply(isQuoteReply) @@ -279,7 +280,9 @@ public class CommentNotificationReasonPublisher { .commentName(comment.getMetadata().getName()) .replier(defaultIfBlank(replyOwner.getDisplayName(), replyOwner.getName())) .content(reply.getSpec().getContent()) - .replyName(reply.getMetadata().getName()); + .replyName(reply.getMetadata().getName()) + .replyOwner(identityFrom(replyOwner).name()) + .repliedOwner(identityFrom(repliedOwner).name()); getCommentSubjectDisplay(comment.getSpec().getSubjectRef()) .ifPresent(subject -> { @@ -337,7 +340,7 @@ public class CommentNotificationReasonPublisher { String commentSubjectUrl, boolean isQuoteReply, String quoteContent, String commentName, String replier, String content, - String replyName) { + String replyName, String replyOwner, String repliedOwner) { } } } diff --git a/application/src/main/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelper.java b/application/src/main/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelper.java index 38879160b..eff83af36 100644 --- a/application/src/main/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelper.java +++ b/application/src/main/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelper.java @@ -9,7 +9,7 @@ import run.halo.app.core.extension.content.Comment; import run.halo.app.core.extension.content.Reply; import run.halo.app.core.extension.notification.Subscription; import run.halo.app.notification.NotificationCenter; -import run.halo.app.notification.SubscriberEmailResolver; +import run.halo.app.notification.UserIdentity; /** * Reply notification subscription helper. @@ -22,7 +22,6 @@ import run.halo.app.notification.SubscriberEmailResolver; public class ReplyNotificationSubscriptionHelper { private final NotificationCenter notificationCenter; - private final SubscriberEmailResolver subscriberEmailResolver; /** * Subscribe new reply reason for comment. @@ -30,13 +29,7 @@ public class ReplyNotificationSubscriptionHelper { * @param comment comment */ public void subscribeNewReplyReasonForComment(Comment comment) { - var reasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(comment.getApiVersion()) - .kind(comment.getKind()) - .name(comment.getMetadata().getName()) - .build(); - subscribeReply(reasonSubject, - Identity.fromCommentOwner(comment.getSpec().getOwner())); + subscribeReply(identityFrom(comment.getSpec().getOwner())); } /** @@ -45,50 +38,36 @@ public class ReplyNotificationSubscriptionHelper { * @param reply reply */ public void subscribeNewReplyReasonForReply(Reply reply) { - var reasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(reply.getApiVersion()) - .kind(reply.getKind()) - .name(reply.getMetadata().getName()) - .build(); var subjectOwner = reply.getSpec().getOwner(); - subscribeReply(reasonSubject, - Identity.fromCommentOwner(subjectOwner)); + subscribeReply(identityFrom(subjectOwner)); } - void subscribeReply(Subscription.ReasonSubject reasonSubject, - Identity identity) { + void subscribeReply(UserIdentity identity) { var subscriber = createSubscriber(identity); if (subscriber == null) { return; } var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU); - interestReason.setSubject(reasonSubject); + interestReason.setExpression("props.repliedOwner == '%s'".formatted(identity.name())); notificationCenter.subscribe(subscriber, interestReason).block(); } @Nullable - private Subscription.Subscriber createSubscriber(Identity author) { + private Subscription.Subscriber createSubscriber(UserIdentity author) { if (StringUtils.isBlank(author.name())) { return null; } - Subscription.Subscriber subscriber; - if (author.isEmail()) { - subscriber = subscriberEmailResolver.ofEmail(author.name()); - } else { - subscriber = new Subscription.Subscriber(); - subscriber.setName(author.name()); - } + Subscription.Subscriber subscriber = new Subscription.Subscriber(); + subscriber.setName(author.name()); return subscriber; } - record Identity(String name, boolean isEmail) { - public static Identity fromCommentOwner(Comment.CommentOwner commentOwner) { - if (Comment.CommentOwner.KIND_EMAIL.equals(commentOwner.getKind())) { - return new Identity(commentOwner.getName(), true); - } - return new Identity(commentOwner.getName(), false); + public static UserIdentity identityFrom(Comment.CommentOwner owner) { + if (Comment.CommentOwner.KIND_EMAIL.equals(owner.getKind())) { + return UserIdentity.anonymousWithEmail(owner.getName()); } + return UserIdentity.of(owner.getName()); } } diff --git a/application/src/main/java/run/halo/app/core/extension/reconciler/CommentReconciler.java b/application/src/main/java/run/halo/app/core/extension/reconciler/CommentReconciler.java index 5c91b6f03..4e9ce5237 100644 --- a/application/src/main/java/run/halo/app/core/extension/reconciler/CommentReconciler.java +++ b/application/src/main/java/run/halo/app/core/extension/reconciler/CommentReconciler.java @@ -67,12 +67,11 @@ public class CommentReconciler implements Reconciler { return; } if (addFinalizers(comment.getMetadata(), Set.of(FINALIZER_NAME))) { + replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment); client.update(comment); eventPublisher.publishEvent(new CommentCreatedEvent(this, comment)); } - replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment); - compatibleCreationTime(comment); Comment.CommentStatus status = comment.getStatusOrDefault(); status.setHasNewReply(defaultIfNull(status.getUnreadReplyCount(), 0) > 0); diff --git a/application/src/main/java/run/halo/app/core/extension/reconciler/PostReconciler.java b/application/src/main/java/run/halo/app/core/extension/reconciler/PostReconciler.java index 90b01017a..6b464d784 100644 --- a/application/src/main/java/run/halo/app/core/extension/reconciler/PostReconciler.java +++ b/application/src/main/java/run/halo/app/core/extension/reconciler/PostReconciler.java @@ -246,11 +246,8 @@ public class PostReconciler implements Reconciler { var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST); - interestReason.setSubject(Subscription.ReasonSubject.builder() - .apiVersion(post.getApiVersion()) - .kind(post.getKind()) - .name(post.getMetadata().getName()) - .build()); + interestReason.setExpression( + "props.postOwner == '%s'".formatted(post.getSpec().getOwner())); notificationCenter.subscribe(subscriber, interestReason).block(); } diff --git a/application/src/main/java/run/halo/app/core/extension/reconciler/ReplyReconciler.java b/application/src/main/java/run/halo/app/core/extension/reconciler/ReplyReconciler.java index 55a2d1503..3dc0b89c9 100644 --- a/application/src/main/java/run/halo/app/core/extension/reconciler/ReplyReconciler.java +++ b/application/src/main/java/run/halo/app/core/extension/reconciler/ReplyReconciler.java @@ -46,6 +46,7 @@ public class ReplyReconciler implements Reconciler { return; } if (addFinalizers(reply.getMetadata(), Set.of(FINALIZER_NAME))) { + replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply); client.update(reply); eventPublisher.publishEvent(new ReplyCreatedEvent(this, reply)); } @@ -64,8 +65,6 @@ public class ReplyReconciler implements Reconciler { client.update(reply); - replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply); - eventPublisher.publishEvent(new ReplyChangedEvent(this, reply)); }); return new Result(false, null); diff --git a/application/src/main/java/run/halo/app/core/extension/reconciler/SinglePageReconciler.java b/application/src/main/java/run/halo/app/core/extension/reconciler/SinglePageReconciler.java index 4ceecac67..08bd37a26 100644 --- a/application/src/main/java/run/halo/app/core/extension/reconciler/SinglePageReconciler.java +++ b/application/src/main/java/run/halo/app/core/extension/reconciler/SinglePageReconciler.java @@ -107,11 +107,8 @@ public class SinglePageReconciler implements Reconciler { var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE); - interestReason.setSubject(Subscription.ReasonSubject.builder() - .apiVersion(page.getApiVersion()) - .kind(page.getKind()) - .name(page.getMetadata().getName()) - .build()); + interestReason.setExpression( + "props.pageOwner == '%s'".formatted(page.getSpec().getOwner())); notificationCenter.subscribe(subscriber, interestReason).block(); } diff --git a/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperator.java b/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperator.java new file mode 100644 index 000000000..0feec590f --- /dev/null +++ b/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperator.java @@ -0,0 +1,37 @@ +package run.halo.app.infra; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ListOptions; + +/** + * Reactive extension paginated operator to handle extensions by pagination. + * + * @author guqing + * @since 2.15.0 + */ +public interface ReactiveExtensionPaginatedOperator { + + /** + *

Deletes all data, including any new entries added during the execution of this method.

+ *

This method continuously monitors and removes data that appears throughout its runtime, + * ensuring that even data created during the deletion process is also removed.

+ */ + Mono deleteContinuously(Class type, + ListOptions listOptions); + + /** + *

Deletes only the data that existed at the start of the operation.

+ *

This method takes a snapshot of the data at the beginning and deletes only that dataset; + * any data added after the method starts will not be affected or removed.

+ */ + Flux deleteInitialBatch(Class type, + ListOptions listOptions); + + /** + *

Note that: This method can not be used for deletion operation, because + * deletion operation will change the total records.

+ */ + Flux list(Class type, ListOptions listOptions); +} diff --git a/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImpl.java b/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImpl.java new file mode 100644 index 000000000..f6760af0e --- /dev/null +++ b/application/src/main/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImpl.java @@ -0,0 +1,132 @@ +package run.halo.app.infra; + +import static run.halo.app.extension.index.query.QueryFactory.isNull; + +import java.time.Duration; +import java.time.Instant; +import lombok.RequiredArgsConstructor; +import org.springframework.dao.OptimisticLockingFailureException; +import org.springframework.data.domain.Sort; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ListOptions; +import run.halo.app.extension.ListResult; +import run.halo.app.extension.PageRequest; +import run.halo.app.extension.PageRequestImpl; +import run.halo.app.extension.ReactiveExtensionClient; + +@Component +@RequiredArgsConstructor +public class ReactiveExtensionPaginatedOperatorImpl implements ReactiveExtensionPaginatedOperator { + private static final int DEFAULT_PAGE_SIZE = 200; + private final ReactiveExtensionClient client; + + @Override + public Mono deleteContinuously(Class type, + ListOptions listOptions) { + var pageRequest = createPageRequest(); + return cleanupContinuously(type, listOptions, pageRequest); + } + + private Mono cleanupContinuously(Class type, + ListOptions listOptions, + PageRequest pageRequest) { + // forever loop first page until no more to delete + return pageBy(type, listOptions, pageRequest) + .flatMap(page -> Flux.fromIterable(page.getItems()) + .flatMap(client::delete) + .then(page.hasNext() ? cleanupContinuously(type, listOptions, pageRequest) + : Mono.empty()) + ); + } + + @Override + public Flux deleteInitialBatch(Class type, + ListOptions listOptions) { + var pageRequest = createPageRequest(); + var newFieldQuery = listOptions.getFieldSelector() + .andQuery(isNull("metadata.deletionTimestamp")); + listOptions.setFieldSelector(newFieldQuery); + final Instant now = Instant.now(); + + return pageBy(type, listOptions, pageRequest) + // forever loop first page until no more to delete + .expand(result -> result.hasNext() + ? pageBy(type, listOptions, pageRequest) : Mono.empty()) + .flatMap(result -> Flux.fromIterable(result.getItems())) + .takeWhile(item -> shouldTakeNext(item, now)) + .flatMap(this::deleteWithRetry); + } + + static boolean shouldTakeNext(E item, Instant now) { + var creationTimestamp = item.getMetadata().getCreationTimestamp(); + return creationTimestamp.isBefore(now) + || creationTimestamp.equals(now); + } + + @SuppressWarnings("unchecked") + Mono deleteWithRetry(E item) { + return client.delete(item) + .onErrorResume(OptimisticLockingFailureException.class, + e -> attemptToDelete((Class) item.getClass(), item.getMetadata().getName())); + } + + private Mono attemptToDelete(Class type, String name) { + return Mono.defer(() -> client.fetch(type, name) + .flatMap(client::delete) + ) + .retryWhen(Retry.backoff(8, Duration.ofMillis(100)) + .filter(OptimisticLockingFailureException.class::isInstance)); + } + + @Override + public Flux list(Class type, ListOptions listOptions) { + var pageRequest = createPageRequest(); + return list(type, listOptions, pageRequest); + } + + /** + * Paginated list all items to avoid memory overflow. + *
+     * 1. Retrieve data multiple times until all data is consumed.
+     * 2. Fetch next page if current page has more data and consumed records is less than total
+     * records.
+     * 3. Take while consumed records is less than total records.
+     * 4. totalRecords from first page to ensure new inserted data will not be counted in during
+     * querying to avoid infinite loop.
+     * 
+ */ + private Flux list(Class type, ListOptions listOptions, + PageRequest pageRequest) { + final var now = Instant.now(); + return pageBy(type, listOptions, pageRequest) + .expand(result -> { + if (result.hasNext()) { + // fetch next page + var nextPage = nextPage(result, pageRequest.getSort()); + return pageBy(type, listOptions, nextPage); + } else { + return Mono.empty(); + } + }) + .flatMap(page -> Flux.fromIterable(page.getItems())) + .takeWhile(item -> shouldTakeNext(item, now)); + } + + static PageRequest nextPage(ListResult result, Sort sort) { + return PageRequestImpl.of(result.getPage() + 1, result.getSize(), sort); + } + + private PageRequest createPageRequest() { + return PageRequestImpl.of(1, DEFAULT_PAGE_SIZE, + Sort.by("metadata.creationTimestamp", "metadata.name")); + } + + private Mono> pageBy(Class type, ListOptions listOptions, + PageRequest pageRequest) { + return client.listBy(type, listOptions, pageRequest); + } +} diff --git a/application/src/main/java/run/halo/app/infra/SchemeInitializer.java b/application/src/main/java/run/halo/app/infra/SchemeInitializer.java index 43ec39706..de4d01507 100644 --- a/application/src/main/java/run/halo/app/infra/SchemeInitializer.java +++ b/application/src/main/java/run/halo/app/infra/SchemeInitializer.java @@ -438,6 +438,11 @@ public class SchemeInitializer implements ApplicationListener subscription.getSpec().getReason().getSubject().toString())) ); + indexSpecs.add(new IndexSpec() + .setName("spec.reason.expression") + .setIndexFunc(simpleAttribute(Subscription.class, + subscription -> subscription.getSpec().getReason().getExpression())) + ); indexSpecs.add(new IndexSpec() .setName("spec.subscriber") .setIndexFunc(simpleAttribute(Subscription.class, diff --git a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java index 5d4ae677c..79ab9caa1 100644 --- a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java +++ b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java @@ -1,23 +1,14 @@ package run.halo.app.notification; import static org.apache.commons.lang3.StringUtils.defaultString; -import static run.halo.app.extension.index.query.QueryFactory.and; -import static run.halo.app.extension.index.query.QueryFactory.equal; -import static run.halo.app.extension.index.query.QueryFactory.or; import java.util.HashMap; -import java.util.HashSet; import java.util.Locale; import java.util.Optional; -import java.util.function.BiPredicate; -import java.util.function.Function; import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; -import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -27,15 +18,8 @@ import run.halo.app.core.extension.notification.NotifierDescriptor; import run.halo.app.core.extension.notification.Reason; import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.Subscription; -import run.halo.app.extension.GroupVersionKind; -import run.halo.app.extension.ListOptions; -import run.halo.app.extension.ListResult; import run.halo.app.extension.Metadata; -import run.halo.app.extension.PageRequest; -import run.halo.app.extension.PageRequestImpl; import run.halo.app.extension.ReactiveExtensionClient; -import run.halo.app.extension.index.query.Query; -import run.halo.app.extension.router.selector.FieldSelector; import run.halo.app.notification.endpoint.SubscriptionRouter; /** @@ -55,37 +39,33 @@ public class DefaultNotificationCenter implements NotificationCenter { private final UserNotificationPreferenceService userNotificationPreferenceService; private final NotificationTemplateRender notificationTemplateRender; private final SubscriptionRouter subscriptionRouter; + private final RecipientResolver recipientResolver; + private final SubscriptionService subscriptionService; @Override public Mono notify(Reason reason) { - var reasonSubject = reason.getSpec().getSubject(); - var subscriptionReasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(reasonSubject.getApiVersion()) - .kind(reasonSubject.getKind()) - .name(reasonSubject.getName()) - .build(); - return listObservers(reason.getSpec().getReasonType(), subscriptionReasonSubject) - .doOnNext(subscription -> { + return recipientResolver.resolve(reason) + .doOnNext(subscriber -> { log.debug("Dispatching notification to subscriber [{}] for reason [{}]", - subscription.getSpec().getSubscriber(), reason.getMetadata().getName()); + subscriber, reason.getMetadata().getName()); }) .publishOn(Schedulers.boundedElastic()) - .flatMap(subscription -> dispatchNotification(reason, subscription)) + .flatMap(subscriber -> dispatchNotification(reason, subscriber)) .then(); } @Override public Mono subscribe(Subscription.Subscriber subscriber, Subscription.InterestReason reason) { - return listSubscription(subscriber, reason) - .next() - .switchIfEmpty(Mono.defer(() -> { + return unsubscribe(subscriber, reason) + .then(Mono.defer(() -> { var subscription = new Subscription(); subscription.setMetadata(new Metadata()); subscription.getMetadata().setGenerateName("subscription-"); subscription.setSpec(new Subscription.Spec()); subscription.getSpec().setUnsubscribeToken(Subscription.generateUnsubscribeToken()); subscription.getSpec().setSubscriber(subscriber); + Subscription.InterestReason.ensureSubjectHasValue(reason); subscription.getSpec().setReason(reason); return client.create(subscription); })); @@ -93,75 +73,47 @@ public class DefaultNotificationCenter implements NotificationCenter { @Override public Mono unsubscribe(Subscription.Subscriber subscriber) { - // pagination query all subscriptions of the subscriber to avoid large data - var pageRequest = PageRequestImpl.of(1, 200, - Sort.by("metadata.creationTimestamp", "metadata.name")); - return Flux.defer(() -> pageSubscriptionBy(subscriber, pageRequest)) - .expand(page -> page.hasNext() - ? pageSubscriptionBy(subscriber, pageRequest.next()) - : Mono.empty() - ) - .flatMap(page -> Flux.fromIterable(page.getItems())) - .flatMap(client::delete) - .then(); + return subscriptionService.remove(subscriber).then(); } @Override public Mono unsubscribe(Subscription.Subscriber subscriber, Subscription.InterestReason reason) { - return listSubscription(subscriber, reason) - .flatMap(client::delete) - .then(); + return subscriptionService.remove(subscriber, reason).then(); } - Mono> pageSubscriptionBy(Subscription.Subscriber subscriber, - PageRequest pageRequest) { - var listOptions = new ListOptions(); - var fieldQuery = equal("spec.subscriber", subscriber.getName()); - listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); - return client.listBy(Subscription.class, listOptions, pageRequest); - } - - Flux listSubscription(Subscription.Subscriber subscriber, - Subscription.InterestReason reason) { - var listOptions = new ListOptions(); - var fieldQuery = and( - getSubscriptionFieldQuery(reason.getReasonType(), reason.getSubject()), - equal("spec.subscriber", subscriber.getName()) - ); - listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); - return client.listAll(Subscription.class, listOptions, defaultSort()); - } - - Flux getNotifiersBySubscriber(Subscription.Subscriber subscriber, Reason reason) { + Flux getNotifiersBySubscriber(Subscriber subscriber, Reason reason) { var reasonType = reason.getSpec().getReasonType(); - return userNotificationPreferenceService.getByUser(subscriber.getName()) + return userNotificationPreferenceService.getByUser(subscriber.name()) .map(UserNotificationPreference::getReasonTypeNotifier) .map(reasonTypeNotification -> reasonTypeNotification.getNotifiers(reasonType)) .flatMapMany(Flux::fromIterable); } - Mono dispatchNotification(Reason reason, Subscription subscription) { - var subscriber = subscription.getSpec().getSubscriber(); + Mono dispatchNotification(Reason reason, Subscriber subscriber) { return getNotifiersBySubscriber(subscriber, reason) .flatMap(notifierName -> client.fetch(NotifierDescriptor.class, notifierName)) - .flatMap(descriptor -> prepareNotificationElement(subscription, reason, descriptor)) + .flatMap(descriptor -> prepareNotificationElement(subscriber, reason, descriptor)) .flatMap(element -> { var dispatchMono = sendNotification(element); + if (subscriber.isAnonymous()) { + return dispatchMono; + } + // create notification for user var innerNofificationMono = createNotification(element); return Mono.when(dispatchMono, innerNofificationMono); }) .then(); } - Mono prepareNotificationElement(Subscription subscription, Reason reason, + Mono prepareNotificationElement(Subscriber subscriber, Reason reason, NotifierDescriptor descriptor) { - return getLocaleFromSubscriber(subscription) - .flatMap(locale -> inferenceTemplate(reason, subscription, locale)) + return getLocaleFromSubscriber(subscriber) + .flatMap(locale -> inferenceTemplate(reason, subscriber, locale)) .map(notificationContent -> NotificationElement.builder() .descriptor(descriptor) .reason(reason) - .subscription(subscription) + .subscriber(subscriber) .reasonType(notificationContent.reasonType()) .notificationTitle(notificationContent.title()) .reasonAttributes(notificationContent.reasonAttributes()) @@ -173,7 +125,7 @@ public class DefaultNotificationCenter implements NotificationCenter { Mono sendNotification(NotificationElement notificationElement) { var descriptor = notificationElement.descriptor(); - var subscription = notificationElement.subscription(); + var subscriber = notificationElement.subscriber(); final var notifierExtName = descriptor.getSpec().getNotifierExtName(); return notificationContextFrom(notificationElement) .flatMap(notificationContext -> notificationSender.sendNotification(notifierExtName, @@ -181,7 +133,7 @@ public class DefaultNotificationCenter implements NotificationCenter { .onErrorResume(throwable -> { log.error( "Failed to send notification to subscriber [{}] through notifier [{}]", - subscription.getSpec().getSubscriber(), + subscriber, descriptor.getSpec().getDisplayName(), throwable); return Mono.empty(); @@ -192,9 +144,8 @@ public class DefaultNotificationCenter implements NotificationCenter { Mono createNotification(NotificationElement notificationElement) { var reason = notificationElement.reason(); - var subscription = notificationElement.subscription(); - var subscriber = subscription.getSpec().getSubscriber(); - return client.fetch(User.class, subscriber.getName()) + var subscriber = notificationElement.subscriber(); + return client.fetch(User.class, subscriber.name()) .flatMap(user -> { Notification notification = new Notification(); notification.setMetadata(new Metadata()); @@ -203,7 +154,7 @@ public class DefaultNotificationCenter implements NotificationCenter { notification.getSpec().setTitle(notificationElement.notificationTitle()); notification.getSpec().setRawContent(notificationElement.notificationRawBody()); notification.getSpec().setHtmlContent(notificationElement.notificationHtmlBody); - notification.getSpec().setRecipient(subscriber.getName()); + notification.getSpec().setRecipient(subscriber.name()); notification.getSpec().setReason(reason.getMetadata().getName()); notification.getSpec().setUnread(true); return client.create(notification); @@ -223,7 +174,7 @@ public class DefaultNotificationCenter implements NotificationCenter { final var descriptorName = element.descriptor().getMetadata().getName(); final var reason = element.reason(); final var descriptor = element.descriptor(); - final var subscription = element.subscription(); + final var subscriber = element.subscriber(); var messagePayload = new NotificationContext.MessagePayload(); messagePayload.setTitle(element.notificationTitle()); @@ -232,7 +183,7 @@ public class DefaultNotificationCenter implements NotificationCenter { messagePayload.setAttributes(element.reasonAttributes()); var message = new NotificationContext.Message(); - message.setRecipient(subscription.getSpec().getSubscriber().getName()); + message.setRecipient(subscriber.name()); message.setPayload(messagePayload); message.setTimestamp(reason.getMetadata().getCreationTimestamp()); var reasonSubject = reason.getSpec().getSubject(); @@ -270,25 +221,25 @@ public class DefaultNotificationCenter implements NotificationCenter { }); } - Mono inferenceTemplate(Reason reason, Subscription subscription, + Mono inferenceTemplate(Reason reason, Subscriber subscriber, Locale locale) { var reasonTypeName = reason.getSpec().getReasonType(); - var subscriber = subscription.getSpec().getSubscriber(); return getReasonType(reasonTypeName) .flatMap(reasonType -> notificationTemplateSelector.select(reasonTypeName, locale) .flatMap(template -> { final var templateContent = template.getSpec().getTemplate(); var model = toReasonAttributes(reason); - var identity = UserIdentity.of(subscriber.getName()); var subscriberInfo = new HashMap<>(); - if (identity.isAnonymous()) { - subscriberInfo.put("displayName", identity.getEmail().orElse("")); + if (subscriber.isAnonymous()) { + subscriberInfo.put("displayName", subscriber.getEmail().orElseThrow()); } else { - subscriberInfo.put("displayName", "@" + identity.name()); + subscriberInfo.put("displayName", "@" + subscriber.username()); } - subscriberInfo.put("id", subscriber.getName()); + subscriberInfo.put("id", subscriber.name()); model.put("subscriber", subscriberInfo); - model.put("unsubscribeUrl", getUnsubscribeUrl(subscription)); + + var unsubscriptionMono = getUnsubscribeUrl(subscriber.subscriptionName()) + .doOnNext(url -> model.put("unsubscribeUrl", url)); var builder = NotificationContent.builder() .reasonType(reasonType) @@ -305,7 +256,7 @@ public class DefaultNotificationCenter implements NotificationCenter { var htmlBodyMono = notificationTemplateRender .render(templateContent.getHtmlBody(), model) .doOnNext(builder::htmlBody); - return Mono.when(titleMono, rawBodyMono, htmlBodyMono) + return Mono.when(unsubscriptionMono, titleMono, rawBodyMono, htmlBodyMono) .then(Mono.fromSupplier(builder::build)); }) ); @@ -316,13 +267,14 @@ public class DefaultNotificationCenter implements NotificationCenter { ReasonAttributes reasonAttributes) { } - String getUnsubscribeUrl(Subscription subscription) { - return subscriptionRouter.getUnsubscribeUrl(subscription); + Mono getUnsubscribeUrl(String subscriptionName) { + return client.get(Subscription.class, subscriptionName) + .map(subscriptionRouter::getUnsubscribeUrl); } @Builder record NotificationElement(ReasonType reasonType, Reason reason, - Subscription subscription, NotifierDescriptor descriptor, + Subscriber subscriber, NotifierDescriptor descriptor, String notificationTitle, String notificationRawBody, String notificationHtmlBody, @@ -333,80 +285,8 @@ public class DefaultNotificationCenter implements NotificationCenter { return client.get(ReasonType.class, reasonTypeName); } - Mono getLocaleFromSubscriber(Subscription subscription) { + Mono getLocaleFromSubscriber(Subscriber subscriber) { // TODO get locale from subscriber return Mono.just(Locale.getDefault()); } - - Flux listObservers(String reasonTypeName, - Subscription.ReasonSubject reasonSubject) { - Assert.notNull(reasonTypeName, "The reasonTypeName must not be null"); - Assert.notNull(reasonSubject, "The reasonSubject must not be null"); - final var listOptions = new ListOptions(); - var fieldQuery = getSubscriptionFieldQuery(reasonTypeName, reasonSubject); - listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); - return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort())); - } - - private static Query getSubscriptionFieldQuery(String reasonTypeName, - Subscription.ReasonSubject reasonSubject) { - var matchAllSubject = new Subscription.ReasonSubject(); - matchAllSubject.setKind(reasonSubject.getKind()); - matchAllSubject.setApiVersion(reasonSubject.getApiVersion()); - return and(equal("spec.reason.reasonType", reasonTypeName), - or(equal("spec.reason.subject", reasonSubject.toString()), - // source reason subject name is blank present match all - equal("spec.reason.subject", matchAllSubject.toString()) - ) - ); - } - - static Flux distinctByKey(Flux source) { - final var distinctKeyPredicate = subscriptionDistinctKeyPredicate(); - return source.distinct(Function.identity(), HashSet::new, - (set, val) -> { - for (Subscription subscription : set) { - if (distinctKeyPredicate.test(subscription, val)) { - return false; - } - } - // no duplicated return true - set.add(val); - return true; - }, - HashSet::clear); - } - - Sort defaultSort() { - return Sort.by(Sort.Order.asc("metadata.creationTimestamp"), - Sort.Order.asc("metadata.name")); - } - - static BiPredicate subscriptionDistinctKeyPredicate() { - return (a, b) -> { - if (!a.getSpec().getSubscriber().equals(b.getSpec().getSubscriber())) { - return false; - } - var reasonA = a.getSpec().getReason(); - var reasonB = b.getSpec().getReason(); - if (!reasonA.getReasonType().equals(reasonB.getReasonType())) { - return false; - } - var ars = reasonA.getSubject(); - var brs = reasonB.getSubject(); - var gvkForA = - GroupVersionKind.fromAPIVersionAndKind(ars.getApiVersion(), ars.getKind()); - var gvkForB = - GroupVersionKind.fromAPIVersionAndKind(brs.getApiVersion(), brs.getKind()); - - if (!gvkForA.groupKind().equals(gvkForB.groupKind())) { - return false; - } - // name is blank present match all - if (StringUtils.isBlank(ars.getName()) || StringUtils.isBlank(brs.getName())) { - return true; - } - return ars.getName().equals(brs.getName()); - }; - } } diff --git a/application/src/main/java/run/halo/app/notification/DefaultSubscriberEmailResolver.java b/application/src/main/java/run/halo/app/notification/DefaultSubscriberEmailResolver.java index 44dc0a22f..bc86dcdc1 100644 --- a/application/src/main/java/run/halo/app/notification/DefaultSubscriberEmailResolver.java +++ b/application/src/main/java/run/halo/app/notification/DefaultSubscriberEmailResolver.java @@ -21,13 +21,12 @@ import run.halo.app.extension.ReactiveExtensionClient; @Component @RequiredArgsConstructor public class DefaultSubscriberEmailResolver implements SubscriberEmailResolver { - private static final String SEPARATOR = "#"; - private final ReactiveExtensionClient client; @Override public Mono resolve(Subscription.Subscriber subscriber) { - if (isEmailSubscriber(subscriber)) { + var identity = UserIdentity.of(subscriber.getName()); + if (identity.isAnonymous()) { return Mono.fromSupplier(() -> getEmail(subscriber)); } return client.fetch(User.class, subscriber.getName()) @@ -44,20 +43,14 @@ public class DefaultSubscriberEmailResolver implements SubscriberEmailResolver { return subscriber; } - static boolean isEmailSubscriber(Subscription.Subscriber subscriber) { - return UserIdentity.of(subscriber.getName()).isAnonymous(); - } - @NonNull String getEmail(Subscription.Subscriber subscriber) { - if (!isEmailSubscriber(subscriber)) { - throw new IllegalStateException("The subscriber is not an email subscriber"); + var identity = UserIdentity.of(subscriber.getName()); + if (!identity.isAnonymous()) { + throw new IllegalStateException("The subscriber is not an anonymous subscriber"); } - var subscriberName = subscriber.getName(); - String email = subscriberName.substring(subscriberName.indexOf(SEPARATOR) + 1); - if (StringUtils.isBlank(email)) { - throw new IllegalStateException("The subscriber does not have an email"); - } - return email; + return identity.getEmail() + .filter(StringUtils::isNotBlank) + .orElseThrow(() -> new IllegalStateException("The subscriber does not have an email")); } } diff --git a/application/src/main/java/run/halo/app/notification/RecipientResolver.java b/application/src/main/java/run/halo/app/notification/RecipientResolver.java new file mode 100644 index 000000000..7b059e66d --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/RecipientResolver.java @@ -0,0 +1,9 @@ +package run.halo.app.notification; + +import reactor.core.publisher.Flux; +import run.halo.app.core.extension.notification.Reason; + +public interface RecipientResolver { + + Flux resolve(Reason reason); +} diff --git a/application/src/main/java/run/halo/app/notification/RecipientResolverImpl.java b/application/src/main/java/run/halo/app/notification/RecipientResolverImpl.java new file mode 100644 index 000000000..e828da09c --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/RecipientResolverImpl.java @@ -0,0 +1,116 @@ +package run.halo.app.notification; + +import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; + +import com.google.common.base.Throwables; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.context.expression.MapAccessor; +import org.springframework.core.convert.support.DefaultConversionService; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.EvaluationException; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.ParseException; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.DataBindingPropertyAccessor; +import org.springframework.expression.spel.support.SimpleEvaluationContext; +import org.springframework.integration.json.JsonPropertyAccessor; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; +import reactor.core.publisher.Flux; +import run.halo.app.core.extension.notification.Reason; +import run.halo.app.core.extension.notification.Subscription; + +@Slf4j +@Component +@RequiredArgsConstructor +public class RecipientResolverImpl implements RecipientResolver { + private final ExpressionParser expressionParser = new SpelExpressionParser(); + private final EvaluationContext evaluationContext = createEvaluationContext(); + private final SubscriptionService subscriptionService; + + @Override + public Flux resolve(Reason reason) { + var reasonType = reason.getSpec().getReasonType(); + return subscriptionService.listByPerPage(reasonType) + .filter(this::isNotDisabled) + .filter(subscription -> { + var interestReason = subscription.getSpec().getReason(); + if (hasSubject(interestReason)) { + return subjectMatch(subscription, reason.getSpec().getSubject()); + } else if (StringUtils.isNotBlank(interestReason.getExpression())) { + return expressionMatch(subscription.getMetadata().getName(), + interestReason.getExpression(), reason); + } + return false; + }) + .map(subscription -> { + var id = UserIdentity.of(subscription.getSpec().getSubscriber().getName()); + return new Subscriber(id, subscription.getMetadata().getName()); + }) + .distinct(Subscriber::name); + } + + boolean hasSubject(Subscription.InterestReason interestReason) { + return !Subscription.InterestReason.isFallbackSubject(interestReason.getSubject()); + } + + boolean expressionMatch(String subscriptionName, String expressionStr, Reason reason) { + try { + Expression expression = + expressionParser.parseExpression(expressionStr); + var result = expression.getValue(evaluationContext, + exprRootObject(reason), + Boolean.class); + return BooleanUtils.isTrue(result); + } catch (ParseException | EvaluationException e) { + log.debug("Failed to parse or evaluate expression for subscription [{}], skip it.", + subscriptionName, Throwables.getRootCause(e)); + return false; + } + } + + Map exprRootObject(Reason reason) { + var map = new HashMap(3, 1); + map.put("props", defaultIfNull(reason.getSpec().getAttributes(), new ReasonAttributes())); + map.put("subject", reason.getSpec().getSubject()); + map.put("author", reason.getSpec().getAuthor()); + return Collections.unmodifiableMap(map); + } + + static boolean subjectMatch(Subscription subscription, Reason.Subject reasonSubject) { + Assert.notNull(subscription, "The subscription must not be null"); + Assert.notNull(reasonSubject, "The reasonSubject must not be null"); + final var sourceSubject = subscription.getSpec().getReason().getSubject(); + + var matchSubject = new Subscription.ReasonSubject(); + matchSubject.setKind(reasonSubject.getKind()); + matchSubject.setApiVersion(reasonSubject.getApiVersion()); + + if (StringUtils.isBlank(sourceSubject.getName())) { + return sourceSubject.equals(matchSubject); + } + matchSubject.setName(reasonSubject.getName()); + return sourceSubject.equals(matchSubject); + } + + boolean isNotDisabled(Subscription subscription) { + return !subscription.getSpec().isDisabled(); + } + + EvaluationContext createEvaluationContext() { + return SimpleEvaluationContext.forPropertyAccessors( + DataBindingPropertyAccessor.forReadOnlyAccess(), + new MapAccessor(), + new JsonPropertyAccessor() + ) + .withConversionService(DefaultConversionService.getSharedInstance()) + .build(); + } +} diff --git a/application/src/main/java/run/halo/app/notification/Subscriber.java b/application/src/main/java/run/halo/app/notification/Subscriber.java new file mode 100644 index 000000000..d388c0666 --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/Subscriber.java @@ -0,0 +1,28 @@ +package run.halo.app.notification; + +import java.util.Optional; +import org.springframework.util.Assert; +import run.halo.app.infra.AnonymousUserConst; + +public record Subscriber(UserIdentity identity, String subscriptionName) { + public Subscriber { + Assert.notNull(identity, "The subscriber must not be null"); + Assert.hasText(subscriptionName, "The subscription name must not be blank"); + } + + public String name() { + return identity.name(); + } + + public String username() { + return identity.isAnonymous() ? AnonymousUserConst.PRINCIPAL : identity.name(); + } + + public boolean isAnonymous() { + return identity.isAnonymous(); + } + + public Optional getEmail() { + return identity.getEmail(); + } +} diff --git a/application/src/main/java/run/halo/app/notification/SubscriptionMigration.java b/application/src/main/java/run/halo/app/notification/SubscriptionMigration.java new file mode 100644 index 000000000..c9851c506 --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/SubscriptionMigration.java @@ -0,0 +1,155 @@ +package run.halo.app.notification; + +import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_PAGE; +import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_POST; +import static run.halo.app.content.NotificationReasonConst.SOMEONE_REPLIED_TO_YOU; +import static run.halo.app.extension.index.query.QueryFactory.and; +import static run.halo.app.extension.index.query.QueryFactory.equal; +import static run.halo.app.extension.index.query.QueryFactory.in; +import static run.halo.app.extension.index.query.QueryFactory.isNull; +import static run.halo.app.extension.index.query.QueryFactory.startsWith; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.Consumer; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.lang.NonNull; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; +import run.halo.app.core.extension.User; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.ListOptions; +import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.router.selector.FieldSelector; +import run.halo.app.infra.AnonymousUserConst; +import run.halo.app.infra.ReactiveExtensionPaginatedOperator; +import run.halo.app.infra.ReactiveExtensionPaginatedOperatorImpl; + +/** + * Subscription migration to adapt to the new expression subscribe mechanism. + * + * @author guqing + * @since 2.15.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class SubscriptionMigration implements ApplicationListener { + private final NotificationCenter notificationCenter; + private final ReactiveExtensionClient client; + private final SubscriptionService subscriptionService; + private final ReactiveExtensionPaginatedOperator paginatedOperator; + + @Override + @Async + public void onApplicationEvent(@NonNull ApplicationStartedEvent event) { + handleAnonymousSubscription(); + cleanupUserSubscription(); + } + + private void cleanupUserSubscription() { + var listOptions = new ListOptions(); + var query = isNull("metadata.deletionTimestamp"); + listOptions.setFieldSelector(FieldSelector.of(query)); + var iterator = + new ReactiveExtensionPaginatedOperatorImpl(client); + iterator.list(User.class, listOptions) + .map(user -> user.getMetadata().getName()) + .flatMap(this::removeInternalSubscriptionForUser) + .then() + .doOnSuccess(unused -> log.info("Cleanup user subscription completed")) + .block(); + } + + private void handleAnonymousSubscription() { + log.debug("Start to collating anonymous subscription..."); + Set anonymousSubscribers = new HashSet<>(); + deleteAnonymousSubscription(subscription -> { + var name = subscription.getSpec().getSubscriber().getName(); + anonymousSubscribers.add(name); + }).block(); + if (anonymousSubscribers.isEmpty()) { + return; + } + + // anonymous only subscribe some-one-replied-to-you reason + for (String anonymousSubscriber : anonymousSubscribers) { + createSubscription(anonymousSubscriber, + SOMEONE_REPLIED_TO_YOU, + "props.repliedOwner == '%s'".formatted(anonymousSubscriber)).block(); + } + log.info("Collating anonymous subscription completed."); + } + + private Mono deleteAnonymousSubscription(Consumer consumer) { + var listOptions = new ListOptions(); + var query = and(startsWith("spec.subscriber", AnonymousUserConst.PRINCIPAL), + isNull("spec.reason.expression"), + isNull("metadata.deletionTimestamp"), + in("spec.reason.reasonType", Set.of(NEW_COMMENT_ON_POST, + NEW_COMMENT_ON_PAGE, + SOMEONE_REPLIED_TO_YOU)) + ); + listOptions.setFieldSelector(FieldSelector.of(query)); + return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions) + .doOnNext(consumer) + .doOnNext(subscription -> log.debug("Deleted anonymous subscription: {}", + subscription.getMetadata().getName()) + ) + .then(); + } + + private Mono removeInternalSubscriptionForUser(String username) { + log.debug("Start to collating internal subscription for user: {}", username); + var subscriber = new Subscription.Subscriber(); + subscriber.setName(username); + + var listOptions = new ListOptions(); + var fieldQuery = and(isNull("metadata.deletionTimestamp"), + equal("spec.subscriber", subscriber.toString()), + in("spec.reason.reasonType", Set.of( + NEW_COMMENT_ON_POST, + NEW_COMMENT_ON_PAGE, + SOMEONE_REPLIED_TO_YOU + )) + ); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + + return subscriptionService.removeBy(listOptions) + .map(subscription -> { + var name = subscription.getSpec().getSubscriber().getName(); + var reason = subscription.getSpec().getReason(); + String expression = switch (reason.getReasonType()) { + case NEW_COMMENT_ON_POST -> "props.postOwner == '%s'".formatted(name); + case NEW_COMMENT_ON_PAGE -> "props.pageOwner == '%s'".formatted(name); + case SOMEONE_REPLIED_TO_YOU -> "props.repliedOwner == '%s'".formatted(name); + // never happen + default -> null; + }; + return new SubscriptionSummary(name, reason.getReasonType(), expression); + }) + .distinct() + .flatMap(summary -> createSubscription(summary.subscriber(), summary.reasonType(), + summary.expression())) + .then() + .doOnSuccess(unused -> + log.debug("Collating internal subscription for user: {} completed", username)); + } + + Mono createSubscription(String name, String reasonType, String expression) { + var interestReason = new Subscription.InterestReason(); + interestReason.setReasonType(reasonType); + interestReason.setExpression(expression); + var subscriber = new Subscription.Subscriber(); + subscriber.setName(name); + log.debug("Create subscription for user: {} with reasonType: {}", name, reasonType); + return notificationCenter.subscribe(subscriber, interestReason).then(); + } + + record SubscriptionSummary(String subscriber, String reasonType, String expression) { + } +} diff --git a/application/src/main/java/run/halo/app/notification/SubscriptionService.java b/application/src/main/java/run/halo/app/notification/SubscriptionService.java new file mode 100644 index 000000000..a2f5407a9 --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/SubscriptionService.java @@ -0,0 +1,26 @@ +package run.halo.app.notification; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.ListOptions; + +public interface SubscriptionService { + + /** + *

List subscriptions by page one by one.only consume one page then next page will be + * loaded.

+ *

Note that: result can not be used to delete the subscription, it is only used to query the + * subscription.

+ */ + Flux listByPerPage(String reasonType); + + Mono remove(Subscription.Subscriber subscriber, + Subscription.InterestReason interestReasons); + + Mono remove(Subscription.Subscriber subscriber); + + Mono remove(Subscription subscription); + + Flux removeBy(ListOptions listOptions); +} diff --git a/application/src/main/java/run/halo/app/notification/SubscriptionServiceImpl.java b/application/src/main/java/run/halo/app/notification/SubscriptionServiceImpl.java new file mode 100644 index 000000000..3430bd0a1 --- /dev/null +++ b/application/src/main/java/run/halo/app/notification/SubscriptionServiceImpl.java @@ -0,0 +1,103 @@ +package run.halo.app.notification; + +import static run.halo.app.extension.index.query.QueryFactory.and; +import static run.halo.app.extension.index.query.QueryFactory.equal; +import static run.halo.app.extension.index.query.QueryFactory.isNull; +import static run.halo.app.extension.index.query.QueryFactory.startsWith; + +import java.time.Duration; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.dao.OptimisticLockingFailureException; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.ListOptions; +import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.index.query.Query; +import run.halo.app.extension.router.selector.FieldSelector; +import run.halo.app.infra.ReactiveExtensionPaginatedOperator; + +@Component +@RequiredArgsConstructor +public class SubscriptionServiceImpl implements SubscriptionService { + private final ReactiveExtensionClient client; + private final ReactiveExtensionPaginatedOperator paginatedOperator; + + @Override + public Mono remove(Subscription.Subscriber subscriber, + Subscription.InterestReason interestReason) { + Assert.notNull(subscriber, "The subscriber must not be null"); + Assert.notNull(interestReason, "The interest reason must not be null"); + var reasonType = interestReason.getReasonType(); + var expression = interestReason.getExpression(); + var subject = interestReason.getSubject(); + + var listOptions = new ListOptions(); + var fieldQuery = and(isNull("metadata.deletionTimestamp"), + equal("spec.subscriber", subscriber.toString()), + equal("spec.reason.reasonType", reasonType)); + + if (subject != null) { + fieldQuery = and(fieldQuery, reasonSubjectMatch(subject)); + } + if (StringUtils.isNotBlank(expression)) { + fieldQuery = and(fieldQuery, equal("spec.reason.expression", expression)); + } + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions).then(); + } + + @Override + public Mono remove(Subscription.Subscriber subscriber) { + var listOptions = new ListOptions(); + var fieldQuery = and(isNull("metadata.deletionTimestamp"), + equal("spec.subscriber", subscriber.toString())); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions) + .then(); + } + + @Override + public Mono remove(Subscription subscription) { + return client.delete(subscription) + .onErrorResume(OptimisticLockingFailureException.class, + e -> attemptToDelete(subscription.getMetadata().getName())); + } + + @Override + public Flux removeBy(ListOptions listOptions) { + return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions); + } + + @Override + public Flux listByPerPage(String reasonType) { + final var listOptions = new ListOptions(); + var fieldQuery = and(isNull("metadata.deletionTimestamp"), + equal("spec.reason.reasonType", reasonType)); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + return paginatedOperator.list(Subscription.class, listOptions); + } + + private Mono attemptToDelete(String subscriptionName) { + return Mono.defer(() -> client.fetch(Subscription.class, subscriptionName) + .flatMap(client::delete) + ) + .retryWhen(Retry.backoff(8, Duration.ofMillis(100)) + .filter(OptimisticLockingFailureException.class::isInstance)); + } + + Query reasonSubjectMatch(Subscription.ReasonSubject reasonSubject) { + Assert.notNull(reasonSubject, "The reasonSubject must not be null"); + if (StringUtils.isNotBlank(reasonSubject.getName())) { + return equal("spec.reason.subject", reasonSubject.toString()); + } + var matchAllSubject = new Subscription.ReasonSubject(); + matchAllSubject.setKind(reasonSubject.getKind()); + matchAllSubject.setApiVersion(reasonSubject.getApiVersion()); + return startsWith("spec.reason.subject", matchAllSubject.toString()); + } +} diff --git a/application/src/main/resources/extensions/notification.yaml b/application/src/main/resources/extensions/notification.yaml index 2e6e9dac9..2698bf67c 100644 --- a/application/src/main/resources/extensions/notification.yaml +++ b/application/src/main/resources/extensions/notification.yaml @@ -79,6 +79,9 @@ spec: - name: postName type: string description: "The name of the post." + - name: postOwner + type: string + description: "The user name of the post owner." - name: postTitle type: string - name: postUrl @@ -107,6 +110,9 @@ spec: - name: pageName type: string description: "The name of the single page." + - name: pageOwner + type: string + description: "The user name of the page owner." - name: pageTitle type: string - name: pageUrl @@ -144,6 +150,12 @@ spec: type: boolean - name: commentContent type: string + - name: repliedOwner + type: string + description: "The owner of the comment or reply that has been replied to." + - name: replyOwner + type: string + description: "The user who created the current reply." - name: replier type: string description: "The display name of the replier." diff --git a/application/src/test/java/run/halo/app/content/comment/CommentNotificationReasonPublisherTest.java b/application/src/test/java/run/halo/app/content/comment/CommentNotificationReasonPublisherTest.java index 11848c350..7f854d506 100644 --- a/application/src/test/java/run/halo/app/content/comment/CommentNotificationReasonPublisherTest.java +++ b/application/src/test/java/run/halo/app/content/comment/CommentNotificationReasonPublisherTest.java @@ -454,21 +454,6 @@ class CommentNotificationReasonPublisherTest { } } - @Test - void identityFromTest() { - var owner = new Comment.CommentOwner(); - owner.setKind(User.KIND); - owner.setName("fake-user"); - - assertThat(CommentNotificationReasonPublisher.identityFrom(owner)) - .isEqualTo(UserIdentity.of(owner.getName())); - - owner.setKind(Comment.CommentOwner.KIND_EMAIL); - owner.setName("example@example.com"); - assertThat(CommentNotificationReasonPublisher.identityFrom(owner)) - .isEqualTo(UserIdentity.anonymousWithEmail(owner.getName())); - } - static Comment createComment() { var comment = new Comment(); comment.setMetadata(new Metadata()); diff --git a/application/src/test/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelperTest.java b/application/src/test/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelperTest.java index 63efcb474..017301599 100644 --- a/application/src/test/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelperTest.java +++ b/application/src/test/java/run/halo/app/content/comment/ReplyNotificationSubscriptionHelperTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static run.halo.app.content.comment.ReplyNotificationSubscriptionHelper.identityFrom; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -24,9 +25,8 @@ import run.halo.app.core.extension.notification.Subscription; import run.halo.app.extension.GroupVersionKind; import run.halo.app.extension.Metadata; import run.halo.app.extension.Ref; -import run.halo.app.infra.AnonymousUserConst; import run.halo.app.notification.NotificationCenter; -import run.halo.app.notification.SubscriberEmailResolver; +import run.halo.app.notification.UserIdentity; /** * Tests for {@link ReplyNotificationSubscriptionHelper}. @@ -40,9 +40,6 @@ class ReplyNotificationSubscriptionHelperTest { @Mock NotificationCenter notificationCenter; - @Mock - SubscriberEmailResolver subscriberEmailResolver; - @InjectMocks ReplyNotificationSubscriptionHelper notificationSubscriptionHelper; @@ -51,17 +48,12 @@ class ReplyNotificationSubscriptionHelperTest { var comment = createComment(); var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper); - doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(), any()); + doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(UserIdentity.class)); spyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment); - var reasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(comment.getApiVersion()) - .kind(comment.getKind()) - .name(comment.getMetadata().getName()) - .build(); - verify(spyNotificationSubscriptionHelper).subscribeReply(eq(reasonSubject), - eq(ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner( + verify(spyNotificationSubscriptionHelper).subscribeReply( + eq(ReplyNotificationSubscriptionHelper.identityFrom( comment.getSpec().getOwner())) ); } @@ -80,17 +72,12 @@ class ReplyNotificationSubscriptionHelperTest { var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper); - doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(), any()); + doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(UserIdentity.class)); spyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply); - var reasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(reply.getApiVersion()) - .kind(reply.getKind()) - .name(reply.getMetadata().getName()) - .build(); - verify(spyNotificationSubscriptionHelper).subscribeReply(eq(reasonSubject), - eq(ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner( + verify(spyNotificationSubscriptionHelper).subscribeReply( + eq(ReplyNotificationSubscriptionHelper.identityFrom( reply.getSpec().getOwner())) ); } @@ -98,48 +85,38 @@ class ReplyNotificationSubscriptionHelperTest { @Test void subscribeReplyTest() { var comment = createComment(); - var reasonSubject = Subscription.ReasonSubject.builder() - .apiVersion(comment.getApiVersion()) - .kind(comment.getKind()) - .name(comment.getMetadata().getName()) - .build(); - var identity = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner( + var identity = ReplyNotificationSubscriptionHelper.identityFrom( comment.getSpec().getOwner()); when(notificationCenter.subscribe(any(), any())).thenReturn(Mono.empty()); var subscriber = new Subscription.Subscriber(); - subscriber.setName(AnonymousUserConst.PRINCIPAL + "#" + identity.name()); - when(subscriberEmailResolver.ofEmail(eq(identity.name()))) - .thenReturn(subscriber); + subscriber.setName(identity.name()); - notificationSubscriptionHelper.subscribeReply(reasonSubject, identity); + notificationSubscriptionHelper.subscribeReply(identity); var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU); - interestReason.setSubject(reasonSubject); + interestReason.setExpression("props.repliedOwner == '%s'".formatted(subscriber.getName())); verify(notificationCenter).subscribe(eq(subscriber), eq(interestReason)); - verify(subscriberEmailResolver).ofEmail(eq(identity.name())); } @Nested class IdentityTest { @Test - void createForCommentOwner() { - var commentOwner = new Comment.CommentOwner(); - commentOwner.setKind(Comment.CommentOwner.KIND_EMAIL); - commentOwner.setName("example@example.com"); + void identityFromTest() { + var owner = new Comment.CommentOwner(); + owner.setKind(User.KIND); + owner.setName("fake-user"); - var sub = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(commentOwner); - assertThat(sub.isEmail()).isTrue(); - assertThat(sub.name()).isEqualTo(commentOwner.getName()); + assertThat(identityFrom(owner)) + .isEqualTo(UserIdentity.of(owner.getName())); - commentOwner.setKind(User.KIND); - commentOwner.setName("fake-user"); - sub = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(commentOwner); - assertThat(sub.isEmail()).isFalse(); - assertThat(sub.name()).isEqualTo(commentOwner.getName()); + owner.setKind(Comment.CommentOwner.KIND_EMAIL); + owner.setName("example@example.com"); + assertThat(identityFrom(owner)) + .isEqualTo(UserIdentity.anonymousWithEmail(owner.getName())); } } diff --git a/application/src/test/java/run/halo/app/core/extension/reconciler/PostReconcilerTest.java b/application/src/test/java/run/halo/app/core/extension/reconciler/PostReconcilerTest.java index be1597140..005ca8b68 100644 --- a/application/src/test/java/run/halo/app/core/extension/reconciler/PostReconcilerTest.java +++ b/application/src/test/java/run/halo/app/core/extension/reconciler/PostReconcilerTest.java @@ -149,7 +149,7 @@ class PostReconcilerTest { when(client.fetch(eq(Post.class), eq(name))) .thenReturn(Optional.of(post)); when(postService.getContent(eq(post.getSpec().getReleaseSnapshot()), - eq(post.getSpec().getBaseSnapshot()))) + eq(post.getSpec().getBaseSnapshot()))) .thenReturn(Mono.just(ContentWrapper.builder() .snapshotName(post.getSpec().getHeadSnapshot()) .raw("hello world") @@ -215,11 +215,7 @@ class PostReconcilerTest { assertArg(argReason -> { var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST); - interestReason.setSubject(Subscription.ReasonSubject.builder() - .apiVersion(post.getApiVersion()) - .kind(post.getKind()) - .name(post.getMetadata().getName()) - .build()); + interestReason.setExpression("props.postOwner == 'null'"); assertThat(argReason).isEqualTo(interestReason); })); } diff --git a/application/src/test/java/run/halo/app/core/extension/reconciler/SinglePageReconcilerTest.java b/application/src/test/java/run/halo/app/core/extension/reconciler/SinglePageReconcilerTest.java index 9ee8f9139..e5dc4d4c9 100644 --- a/application/src/test/java/run/halo/app/core/extension/reconciler/SinglePageReconcilerTest.java +++ b/application/src/test/java/run/halo/app/core/extension/reconciler/SinglePageReconcilerTest.java @@ -232,11 +232,7 @@ class SinglePageReconcilerTest { assertArg(argReason -> { var interestReason = new Subscription.InterestReason(); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE); - interestReason.setSubject(Subscription.ReasonSubject.builder() - .apiVersion(page.getApiVersion()) - .kind(page.getKind()) - .name(page.getMetadata().getName()) - .build()); + interestReason.setExpression("props.pageOwner == 'null'"); assertThat(argReason).isEqualTo(interestReason); })); } diff --git a/application/src/test/java/run/halo/app/core/extension/reconciler/UserReconcilerTest.java b/application/src/test/java/run/halo/app/core/extension/reconciler/UserReconcilerTest.java index 6f1aac113..b83e1d3b0 100644 --- a/application/src/test/java/run/halo/app/core/extension/reconciler/UserReconcilerTest.java +++ b/application/src/test/java/run/halo/app/core/extension/reconciler/UserReconcilerTest.java @@ -22,6 +22,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import run.halo.app.core.extension.Role; import run.halo.app.core.extension.RoleBinding; import run.halo.app.core.extension.User; @@ -31,6 +32,7 @@ import run.halo.app.extension.Metadata; import run.halo.app.extension.controller.Reconciler; import run.halo.app.infra.AnonymousUserConst; import run.halo.app.infra.ExternalUrlSupplier; +import run.halo.app.notification.NotificationCenter; /** * Tests for {@link UserReconciler}. @@ -46,6 +48,9 @@ class UserReconcilerTest { @Mock private ExtensionClient client; + @Mock + private NotificationCenter notificationCenter; + @Mock private RoleService roleService; @@ -54,6 +59,7 @@ class UserReconcilerTest { @BeforeEach void setUp() { + lenient().when(notificationCenter.unsubscribe(any(), any())).thenReturn(Mono.empty()); lenient().when(roleService.listRoleRefs(any())).thenReturn(Flux.empty()); } diff --git a/application/src/test/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImplTest.java b/application/src/test/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImplTest.java new file mode 100644 index 000000000..8f1dce7f9 --- /dev/null +++ b/application/src/test/java/run/halo/app/infra/ReactiveExtensionPaginatedOperatorImplTest.java @@ -0,0 +1,107 @@ +package run.halo.app.infra; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.domain.Sort; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import run.halo.app.extension.FakeExtension; +import run.halo.app.extension.ListOptions; +import run.halo.app.extension.ListResult; +import run.halo.app.extension.Metadata; +import run.halo.app.extension.PageRequest; +import run.halo.app.extension.ReactiveExtensionClient; + +@ExtendWith(MockitoExtension.class) +class ReactiveExtensionPaginatedOperatorImplTest { + + @Mock + private ReactiveExtensionClient client; + + @InjectMocks + private ReactiveExtensionPaginatedOperatorImpl service; + + @Nested + class ListTest { + + @BeforeEach + void setUp() { + Instant now = Instant.now(); + var items = new ArrayList<>(); + // Generate 900 items + for (int j = 0; j < 9; j++) { + items.addAll(generateItems(100, now)); + } + // mock new items during the process + Instant otherNow = now.plusSeconds(1000); + items.addAll(generateItems(90, otherNow)); + + when(client.listBy(any(), any(), any())).thenAnswer(invocation -> { + PageRequest pageRequest = invocation.getArgument(2); + int pageNumber = pageRequest.getPageNumber(); + var list = ListResult.subList(items, pageNumber, pageRequest.getPageSize()); + var result = new ListResult<>(pageNumber, pageRequest.getPageSize(), + items.size(), list); + return Mono.just(result); + }); + } + + @Test + public void listTest() { + StepVerifier.create(service.list(FakeExtension.class, new ListOptions())) + .expectNextCount(900) + .verifyComplete(); + } + } + + @Test + void nextPageTest() { + var result = new ListResult(1, 10, 30, List.of()); + var sort = Sort.by("metadata.creationTimestamp"); + var nextPage = ReactiveExtensionPaginatedOperatorImpl.nextPage(result, sort); + assertThat(nextPage.getPageNumber()).isEqualTo(2); + assertThat(nextPage.getPageSize()).isEqualTo(10); + assertThat(nextPage.getSort()).isEqualTo(sort); + } + + @Test + void shouldTakeNextTest() { + var now = Instant.now(); + var item = new FakeExtension(); + item.setMetadata(new Metadata()); + item.getMetadata().setCreationTimestamp(now); + var result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now); + assertThat(result).isTrue(); + + item.getMetadata().setCreationTimestamp(now.minusSeconds(1)); + result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now); + assertThat(result).isTrue(); + + item.getMetadata().setCreationTimestamp(now.plusSeconds(1)); + result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now); + assertThat(result).isFalse(); + } + + private List generateItems(int count, Instant creationTimestamp) { + List items = new ArrayList<>(); + for (int i = 0; i < count; i++) { + var item = new FakeExtension(); + item.setMetadata(new Metadata()); + item.getMetadata().setCreationTimestamp(creationTimestamp); + items.add(item); + } + return items; + } +} \ No newline at end of file diff --git a/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java b/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java index 5c97e1b5b..fdbb4cc45 100644 --- a/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java +++ b/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java @@ -13,7 +13,6 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.Locale; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -29,12 +28,8 @@ import run.halo.app.core.extension.notification.NotifierDescriptor; import run.halo.app.core.extension.notification.Reason; import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.Subscription; -import run.halo.app.extension.GroupVersion; -import run.halo.app.extension.ListResult; import run.halo.app.extension.Metadata; -import run.halo.app.extension.PageRequest; import run.halo.app.extension.ReactiveExtensionClient; -import run.halo.app.infra.utils.JsonUtils; /** * Tests for {@link DefaultNotificationCenter}. @@ -60,6 +55,12 @@ class DefaultNotificationCenterTest { @Mock private NotificationSender notificationSender; + @Mock + private RecipientResolver recipientResolver; + + @Mock + private SubscriptionService subscriptionService; + @InjectMocks private DefaultNotificationCenter notificationCenter; @@ -78,21 +79,17 @@ class DefaultNotificationCenterTest { reason.setMetadata(new Metadata()); reason.getMetadata().setName("reason-a"); - var subscriptionReasonSubject = createNewReplyOnCommentSubject(); - var spyNotificationCenter = spy(notificationCenter); - var subscriptions = createSubscriptions(); - doReturn(Flux.fromIterable(subscriptions)) - .when(spyNotificationCenter).listObservers(eq("new-reply-on-comment"), - eq(subscriptionReasonSubject)); + var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name"); + when(recipientResolver.resolve(reason)).thenReturn(Flux.just(subscriber)); + doReturn(Mono.empty()).when(spyNotificationCenter) .dispatchNotification(eq(reason), any()); spyNotificationCenter.notify(reason).block(); verify(spyNotificationCenter).dispatchNotification(eq(reason), any()); - verify(spyNotificationCenter).listObservers(eq("new-reply-on-comment"), - eq(subscriptionReasonSubject)); + verify(recipientResolver).resolve(eq(reason)); } List createSubscriptions() { @@ -129,69 +126,16 @@ class DefaultNotificationCenterTest { var reason = subscription.getSpec().getReason(); - doReturn(Flux.just(subscription)) - .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason)); + doReturn(Mono.empty()) + .when(spyNotificationCenter).unsubscribe(eq(subscriber), eq(reason)); when(client.create(any(Subscription.class))).thenReturn(Mono.empty()); spyNotificationCenter.subscribe(subscriber, reason).block(); - verify(client, times(0)).create(any(Subscription.class)); - - // not exists subscription will create a new subscription - var newReason = JsonUtils.deepCopy(reason); - newReason.setReasonType("fake-reason-type"); - doReturn(Flux.empty()) - .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason)); - spyNotificationCenter.subscribe(subscriber, newReason).block(); verify(client).create(any(Subscription.class)); } - @Test - public void testUnsubscribe() { - Subscription.Subscriber subscriber = new Subscription.Subscriber(); - subscriber.setName("anonymousUser#A"); - var spyNotificationCenter = spy(notificationCenter); - var subscriptions = createSubscriptions(); - - doReturn(Mono.just(new ListResult<>(subscriptions))) - .when(spyNotificationCenter).pageSubscriptionBy(eq(subscriber), any(PageRequest.class)); - - when(client.delete(any(Subscription.class))).thenReturn(Mono.empty()); - - spyNotificationCenter.unsubscribe(subscriber).block(); - - verify(client).delete(any(Subscription.class)); - } - - - @Test - public void testUnsubscribeWithReason() { - var spyNotificationCenter = spy(notificationCenter); - var subscriptions = createSubscriptions(); - - var subscription = subscriptions.get(0); - - var subscriber = subscription.getSpec().getSubscriber(); - - var reason = subscription.getSpec().getReason(); - - var newReason = JsonUtils.deepCopy(reason); - newReason.setReasonType("fake-reason-type"); - doReturn(Flux.empty()) - .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason)); - when(client.delete(any(Subscription.class))).thenReturn(Mono.empty()); - spyNotificationCenter.unsubscribe(subscriber, newReason).block(); - verify(client, times(0)).delete(any(Subscription.class)); - - doReturn(Flux.just(subscription)) - .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason)); - - // exists subscription will be deleted - spyNotificationCenter.unsubscribe(subscriber, reason).block(); - verify(client).delete(any(Subscription.class)); - } - @Test public void testGetNotifiersBySubscriber() { UserNotificationPreference preference = new UserNotificationPreference(); @@ -203,8 +147,7 @@ class DefaultNotificationCenterTest { reason.getMetadata().setName("reason-a"); reason.setSpec(new Reason.Spec()); reason.getSpec().setReasonType("new-reply-on-comment"); - var subscriber = new Subscription.Subscriber(); - subscriber.setName("anonymousUser#A"); + var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name"); notificationCenter.getNotifiersBySubscriber(subscriber, reason) .collectList() @@ -215,7 +158,7 @@ class DefaultNotificationCenterTest { }) .verifyComplete(); - verify(userNotificationPreferenceService).getByUser(eq(subscriber.getName())); + verify(userNotificationPreferenceService).getByUser(eq(subscriber.name())); } @Test @@ -234,7 +177,6 @@ class DefaultNotificationCenterTest { .when(spyNotificationCenter).prepareNotificationElement(any(), any(), any()); doReturn(Mono.empty()).when(spyNotificationCenter).sendNotification(any()); - doReturn(Mono.empty()).when(spyNotificationCenter).createNotification(any()); var reason = new Reason(); reason.setMetadata(new Metadata()); @@ -243,11 +185,15 @@ class DefaultNotificationCenterTest { reason.getSpec().setReasonType("new-reply-on-comment"); var subscription = createSubscriptions().get(0); - spyNotificationCenter.dispatchNotification(reason, subscription).block(); + var subscriptionName = subscription.getMetadata().getName(); + var subscriber = + new Subscriber(UserIdentity.of(subscription.getSpec().getSubscriber().getName()), + subscriptionName); + spyNotificationCenter.dispatchNotification(reason, subscriber).block(); verify(client).fetch(eq(NotifierDescriptor.class), eq("email-notifier")); verify(spyNotificationCenter).sendNotification(any()); - verify(spyNotificationCenter).createNotification(any()); + verify(spyNotificationCenter, times(0)).createNotification(any()); } @Test @@ -282,7 +228,7 @@ class DefaultNotificationCenterTest { var element = mock(DefaultNotificationCenter.NotificationElement.class); var mockDescriptor = mock(NotifierDescriptor.class); when(element.descriptor()).thenReturn(mockDescriptor); - when(element.subscription()).thenReturn(mock(Subscription.class)); + when(element.subscriber()).thenReturn(mock(Subscriber.class)); var notifierDescriptorSpec = mock(NotifierDescriptor.Spec.class); when(mockDescriptor.getSpec()).thenReturn(notifierDescriptorSpec); when(notifierDescriptorSpec.getNotifierExtName()).thenReturn("fake-notifier-ext"); @@ -299,9 +245,12 @@ class DefaultNotificationCenterTest { var subscription = createSubscriptions().get(0); var user = mock(User.class); - var subscriberName = subscription.getSpec().getSubscriber().getName(); - when(client.fetch(eq(User.class), eq(subscriberName))).thenReturn(Mono.just(user)); - when(element.subscription()).thenReturn(subscription); + var subscriptionName = subscription.getMetadata().getName(); + var subscriber = + new Subscriber(UserIdentity.of(subscription.getSpec().getSubscriber().getName()), + subscriptionName); + when(client.fetch(eq(User.class), eq(subscriber.name()))).thenReturn(Mono.just(user)); + when(element.subscriber()).thenReturn(subscriber); when(client.create(any(Notification.class))).thenReturn(Mono.empty()); @@ -314,7 +263,7 @@ class DefaultNotificationCenterTest { notificationCenter.createNotification(element).block(); - verify(client).fetch(eq(User.class), eq(subscriberName)); + verify(client).fetch(eq(User.class), eq(subscriber.name())); verify(client).create(any(Notification.class)); } @@ -334,8 +283,8 @@ class DefaultNotificationCenterTest { doReturn(Mono.just(reasonType)) .when(spyNotificationCenter).getReasonType(eq(reasonTypeName)); - doReturn("fake-url") - .when(spyNotificationCenter).getUnsubscribeUrl(any()); + doReturn(Mono.just("fake-unsubscribe-url")) + .when(spyNotificationCenter).getUnsubscribeUrl(anyString()); final var locale = Locale.CHINESE; @@ -356,98 +305,17 @@ class DefaultNotificationCenterTest { when(notificationTemplateSelector.select(eq(reasonTypeName), any())) .thenReturn(Mono.just(template)); - var subscription = new Subscription(); - subscription.setSpec(new Subscription.Spec()); - var subscriber = new Subscription.Subscriber(); - subscriber.setName("anonymousUser#A"); - subscription.getSpec().setSubscriber(subscriber); + var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name"); - spyNotificationCenter.inferenceTemplate(reason, subscription, locale).block(); + spyNotificationCenter.inferenceTemplate(reason, subscriber, locale).block(); verify(spyNotificationCenter).getReasonType(eq(reasonTypeName)); verify(notificationTemplateSelector).select(eq(reasonTypeName), any()); } - - @Test - void listObserverWhenDuplicateSubscribers() { - var sourceSubscriptions = createSubscriptions(); - var subscriptionA = sourceSubscriptions.get(0); - var subscriptionB = JsonUtils.deepCopy(subscriptionA); - - var subscriptionC = JsonUtils.deepCopy(subscriptionA); - subscriptionC.getSpec().getReason().getSubject().setName(null); - - var subscriptions = Flux.just(subscriptionA, subscriptionB, subscriptionC); - - DefaultNotificationCenter.distinctByKey(subscriptions) - .as(StepVerifier::create) - .expectNext(subscriptionA) - .verifyComplete(); - } - - @Nested - class SubscriptionDistinctKeyPredicateTest { - - @Test - void differentSubjectName() { - var subscriptionA = createSubscriptions().get(0); - var subscriptionB = JsonUtils.deepCopy(subscriptionA); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - - subscriptionB.getSpec().getReason().getSubject().setName("other"); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isFalse(); - - subscriptionB.getSpec().getReason().getSubject().setName(null); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - } - - @Test - void differentSubjectApiVersion() { - var subscriptionA = createSubscriptions().get(0); - var subscriptionB = JsonUtils.deepCopy(subscriptionA); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - - var subjectA = subscriptionA.getSpec().getReason().getSubject(); - var gvForA = GroupVersion.parseAPIVersion(subjectA.getApiVersion()); - subscriptionB.getSpec().getReason().getSubject() - .setApiVersion(gvForA.group() + "/otherVersion"); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - } - - @Test - void differentReasonType() { - var subscriptionA = createSubscriptions().get(0); - var subscriptionB = JsonUtils.deepCopy(subscriptionA); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - - subscriptionB.getSpec().getReason().setReasonType("other"); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isFalse(); - } - - @Test - void differentSubscriber() { - var subscriptionA = createSubscriptions().get(0); - var subscriptionB = JsonUtils.deepCopy(subscriptionA); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isTrue(); - - subscriptionB.getSpec().getSubscriber().setName("other"); - assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate() - .test(subscriptionA, subscriptionB)).isFalse(); - } - } - @Test void getLocaleFromSubscriberTest() { - var subscription = mock(Subscription.class); + var subscription = mock(Subscriber.class); notificationCenter.getLocaleFromSubscriber(subscription) .as(StepVerifier::create) diff --git a/application/src/test/java/run/halo/app/notification/RecipientResolverImplTest.java b/application/src/test/java/run/halo/app/notification/RecipientResolverImplTest.java new file mode 100644 index 000000000..276019d3a --- /dev/null +++ b/application/src/test/java/run/halo/app/notification/RecipientResolverImplTest.java @@ -0,0 +1,180 @@ +package run.halo.app.notification; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import run.halo.app.core.extension.notification.Reason; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.Metadata; + +/** + * Tests for {@link RecipientResolverImpl}. + * + * @author guqing + * @since 2.15.0 + */ +@ExtendWith(MockitoExtension.class) +class RecipientResolverImplTest { + + @Mock + private SubscriptionService subscriptionService; + + @InjectMocks + private RecipientResolverImpl recipientResolver; + + @Test + void testExpressionMatch() { + var subscriber1 = new Subscription.Subscriber(); + subscriber1.setName("test"); + final var subscription1 = createSubscription(subscriber1); + subscription1.getMetadata().setName("test-subscription"); + subscription1.getSpec().getReason().setSubject(null); + subscription1.getSpec().getReason().setExpression("props.owner == 'test'"); + + var subscriber2 = new Subscription.Subscriber(); + subscriber2.setName("guqing"); + final var subscription2 = createSubscription(subscriber2); + subscription2.getMetadata().setName("guqing-subscription"); + subscription2.getSpec().getReason().setSubject(null); + subscription2.getSpec().getReason().setExpression("props.owner == 'guqing'"); + + var reason = new Reason(); + reason.setSpec(new Reason.Spec()); + reason.getSpec().setReasonType("new-comment-on-post"); + reason.getSpec().setSubject(new Reason.Subject()); + reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1"); + reason.getSpec().getSubject().setKind("Post"); + reason.getSpec().getSubject().setName("fake-post"); + var reasonAttributes = new ReasonAttributes(); + reasonAttributes.put("owner", "guqing"); + reason.getSpec().setAttributes(reasonAttributes); + + when(subscriptionService.listByPerPage(anyString())) + .thenReturn(Flux.just(subscription1, subscription2)); + + recipientResolver.resolve(reason) + .as(StepVerifier::create) + .expectNext(new Subscriber(UserIdentity.of("guqing"), "guqing-subscription")) + .verifyComplete(); + + verify(subscriptionService).listByPerPage(anyString()); + } + + @Test + void testSubjectMatch() { + var subscriber = new Subscription.Subscriber(); + subscriber.setName("test"); + Subscription subscription = createSubscription(subscriber); + + when(subscriptionService.listByPerPage(anyString())) + .thenReturn(Flux.just(subscription)); + + var reason = new Reason(); + reason.setSpec(new Reason.Spec()); + reason.getSpec().setReasonType("new-comment-on-post"); + reason.getSpec().setSubject(new Reason.Subject()); + reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1"); + reason.getSpec().getSubject().setKind("Post"); + reason.getSpec().getSubject().setName("fake-post"); + + recipientResolver.resolve(reason) + .as(StepVerifier::create) + .expectNext(new Subscriber(UserIdentity.of("test"), "fake-subscription")) + .verifyComplete(); + + verify(subscriptionService).listByPerPage(anyString()); + } + + @Test + void distinct() { + // same subscriber to different subscriptions + var subscriber = new Subscription.Subscriber(); + subscriber.setName("test"); + + final var subscription1 = createSubscription(subscriber); + subscription1.getMetadata().setName("sub-1"); + + final var subscription2 = createSubscription(subscriber); + subscription2.getMetadata().setName("sub-2"); + subscription2.getSpec().getReason().setSubject(null); + subscription2.getSpec().getReason().setExpression("props.owner == 'guqing'"); + + when(subscriptionService.listByPerPage(anyString())) + .thenReturn(Flux.just(subscription1, subscription2)); + + var reason = new Reason(); + reason.setSpec(new Reason.Spec()); + reason.getSpec().setReasonType("new-comment-on-post"); + reason.getSpec().setSubject(new Reason.Subject()); + reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1"); + reason.getSpec().getSubject().setKind("Post"); + reason.getSpec().getSubject().setName("fake-post"); + var reasonAttributes = new ReasonAttributes(); + reasonAttributes.put("owner", "guqing"); + reason.getSpec().setAttributes(reasonAttributes); + + recipientResolver.resolve(reason) + .as(StepVerifier::create) + .expectNextCount(1) + .verifyComplete(); + + verify(subscriptionService).listByPerPage(anyString()); + } + + @Test + void subjectMatchTest() { + var subscriber = new Subscription.Subscriber(); + subscriber.setName("test"); + + final var subscription = createSubscription(subscriber); + + // match all name subscription + var subject = new Reason.Subject(); + subject.setApiVersion("content.halo.run/v1alpha1"); + subject.setKind("Post"); + subject.setName("fake-post"); + assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isTrue(); + + // different kind + subject = new Reason.Subject(); + subject.setApiVersion("content.halo.run/v1alpha1"); + subject.setKind("SinglePage"); + subject.setName("fake-post"); + assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isFalse(); + + // special case + subscription.getSpec().getReason().getSubject().setName("other-post"); + subject = new Reason.Subject(); + subject.setApiVersion("content.halo.run/v1alpha1"); + subject.setKind("Post"); + subject.setName("fake-post"); + assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isFalse(); + subject.setName("other-post"); + assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isTrue(); + } + + private static Subscription createSubscription(Subscription.Subscriber subscriber) { + Subscription subscription = new Subscription(); + subscription.setMetadata(new Metadata()); + subscription.getMetadata().setName("fake-subscription"); + subscription.setSpec(new Subscription.Spec()); + subscription.getSpec().setSubscriber(subscriber); + + var interestReason = new Subscription.InterestReason(); + interestReason.setReasonType("new-comment-on-post"); + interestReason.setSubject(new Subscription.ReasonSubject()); + interestReason.getSubject().setApiVersion("content.halo.run/v1alpha1"); + interestReason.getSubject().setKind("Post"); + subscription.getSpec().setReason(interestReason); + return subscription; + } +} diff --git a/application/src/test/java/run/halo/app/notification/SubscriptionServiceImplTest.java b/application/src/test/java/run/halo/app/notification/SubscriptionServiceImplTest.java new file mode 100644 index 000000000..07ffbc8c3 --- /dev/null +++ b/application/src/test/java/run/halo/app/notification/SubscriptionServiceImplTest.java @@ -0,0 +1,74 @@ +package run.halo.app.notification; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.dao.OptimisticLockingFailureException; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.Metadata; +import run.halo.app.extension.ReactiveExtensionClient; + +/** + * Tests for {@link SubscriptionServiceImpl}. + * + * @author guqing + * @since 2.15.0 + */ +@ExtendWith(MockitoExtension.class) +class SubscriptionServiceImplTest { + + @Mock + private ReactiveExtensionClient client; + + @InjectMocks + private SubscriptionServiceImpl subscriptionService; + + @Test + void remove() { + var i = new AtomicLong(1L); + when(client.delete(any(Subscription.class))).thenAnswer(invocation -> { + var subscription = (Subscription) invocation.getArgument(0); + if (i.get() != subscription.getMetadata().getVersion()) { + return Mono.error(new OptimisticLockingFailureException("fake-exception")); + } + return Mono.just(subscription); + }); + + var subscription = new Subscription(); + subscription.setMetadata(new Metadata()); + subscription.getMetadata().setName("fake-subscription"); + subscription.getMetadata().setVersion(0L); + + when(client.fetch(eq(Subscription.class), eq("fake-subscription"))) + .thenAnswer(invocation -> { + if (i.incrementAndGet() > 3) { + subscription.getMetadata().setVersion(i.get()); + } else { + subscription.getMetadata().setVersion(i.get() - 1); + } + return Mono.just(subscription); + }); + + subscriptionService.remove(subscription) + .as(StepVerifier::create) + .expectNextCount(1) + .verifyComplete(); + + // give version=0, but the real version is 1 + // give version=1, but the real version is 2 + // give version=2, but the real version is 3 + // give version=3, but the real version is 3 (delete success) + verify(client, times(3)).fetch(eq(Subscription.class), eq("fake-subscription")); + } +} \ No newline at end of file diff --git a/application/src/test/java/run/halo/app/notification/SubscriptionServiceIntegrationTest.java b/application/src/test/java/run/halo/app/notification/SubscriptionServiceIntegrationTest.java new file mode 100644 index 000000000..6a55506f7 --- /dev/null +++ b/application/src/test/java/run/halo/app/notification/SubscriptionServiceIntegrationTest.java @@ -0,0 +1,171 @@ +package run.halo.app.notification; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static run.halo.app.extension.index.query.QueryFactory.isNull; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.annotation.DirtiesContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import run.halo.app.core.extension.notification.Subscription; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionStoreUtil; +import run.halo.app.extension.ListOptions; +import run.halo.app.extension.PageRequestImpl; +import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.SchemeManager; +import run.halo.app.extension.index.IndexerFactory; +import run.halo.app.extension.router.selector.FieldSelector; +import run.halo.app.extension.store.ReactiveExtensionStoreClient; +import run.halo.app.infra.utils.JsonUtils; + +/** + * Integration tests for {@link SubscriptionService}. + * + * @author guqing + * @since 2.15.0 + */ +@DirtiesContext +@SpringBootTest +class SubscriptionServiceIntegrationTest { + + @Autowired + private SchemeManager schemeManager; + + @SpyBean + private ReactiveExtensionClient client; + + @Autowired + private ReactiveExtensionStoreClient storeClient; + + @Autowired + private IndexerFactory indexerFactory; + + Mono deleteImmediately(Extension extension) { + var name = extension.getMetadata().getName(); + var scheme = schemeManager.get(extension.getClass()); + // un-index + var indexer = indexerFactory.getIndexer(extension.groupVersionKind()); + indexer.unIndexRecord(extension.getMetadata().getName()); + + // delete from db + var storeName = ExtensionStoreUtil.buildStoreName(scheme, name); + return storeClient.delete(storeName, extension.getMetadata().getVersion()) + .thenReturn(extension); + } + + @Nested + class RemoveInitialBatchTest { + static int size = 310; + private final List storedSubscriptions = subscriptionsForStore(); + + @Autowired + private SubscriptionService subscriptionService; + + @BeforeEach + void setUp() { + Flux.fromIterable(storedSubscriptions) + .flatMap(comment -> client.create(comment)) + .as(StepVerifier::create) + .expectNextCount(storedSubscriptions.size()) + .verifyComplete(); + } + + @AfterEach + void tearDown() { + Flux.fromIterable(storedSubscriptions) + .flatMap(SubscriptionServiceIntegrationTest.this::deleteImmediately) + .as(StepVerifier::create) + .expectNextCount(storedSubscriptions.size()) + .verifyComplete(); + } + + private List subscriptionsForStore() { + List subscriptions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + var subscription = createSubscription(); + subscription.getMetadata().setName("subscription-" + i); + subscriptions.add(subscription); + } + return subscriptions; + } + + @Test + void removeTest() { + var subscriber = new Subscription.Subscriber(); + subscriber.setName("admin"); + var interestReason = new Subscription.InterestReason(); + interestReason.setReasonType("new-comment-on-post"); + var subject = new Subscription.ReasonSubject(); + subject.setApiVersion("content.halo.run/v1alpha1"); + subject.setKind("Post"); + interestReason.setSubject(subject); + + subscriptionService.remove(subscriber, interestReason).block(); + + verify(client, atLeast(size)).delete(any(Subscription.class)); + assertCleanedUp(); + } + + @Test + void removeBySubscriberTest() { + var subscriber = new Subscription.Subscriber(); + subscriber.setName("admin"); + + subscriptionService.remove(subscriber).block(); + verify(client, atLeast(size)).delete(any(Subscription.class)); + assertCleanedUp(); + } + + private void assertCleanedUp() { + var listOptions = new ListOptions(); + listOptions.setFieldSelector(FieldSelector.of(isNull("metadata.deletionTimestamp"))); + client.listBy(Subscription.class, listOptions, PageRequestImpl.ofSize(1)) + .as(StepVerifier::create) + .consumeNextWith(result -> { + assertThat(result.getTotal()).isEqualTo(0); + assertThat(result.getItems()).isEmpty(); + }) + .verifyComplete(); + } + } + + Subscription createSubscription() { + return JsonUtils.jsonToObject(""" + { + "spec": { + "subscriber": { + "name": "admin" + }, + "unsubscribeToken": "423530c9-bec7-446e-b73b-dd98ac00ba2b", + "reason": { + "reasonType": "new-comment-on-post", + "subject": { + "name": "5152aea5-c2e8-4717-8bba-2263d46e19d5", + "apiVersion": "content.halo.run/v1alpha1", + "kind": "Post" + } + }, + "disabled": false + }, + "apiVersion": "notification.halo.run/v1alpha1", + "kind": "Subscription", + "metadata": { + "generateName": "subscription-" + } + } + """, Subscription.class); + } +} \ No newline at end of file diff --git a/docs/notification/README.md b/docs/notification/README.md index f672332e9..1408ac18f 100644 --- a/docs/notification/README.md +++ b/docs/notification/README.md @@ -19,7 +19,8 @@ 设计一个通知功能,可以根据以下目标,实现订阅和推送通知: - 支持扩展多种通知方式,例如邮件、短信、Slack 等。 -- 支持通知条件并可扩展,例如 Halo 有新文章发布事件如果用户订阅了新文章发布事件但付费订阅插件决定了此文章只有付费用户才可收到通知、按照付费等级不同决定是否发送新文章通知给对应用户等需要通过实现通知条件的扩展点来满足对应需求。 +- 支持通知条件并可扩展,例如 Halo + 有新文章发布事件如果用户订阅了新文章发布事件但付费订阅插件决定了此文章只有付费用户才可收到通知、按照付费等级不同决定是否发送新文章通知给对应用户等需要通过实现通知条件的扩展点来满足对应需求。 - 支持定制化选项,例如是否开启通知、通知时段等。 - 支持通知流程,例如通知的发送、接收、查看、标记等。 - 通知内容支持多语言。 @@ -97,7 +98,8 @@ spec: #### Subscription -`Subscription` 自定义模型,定义了特定事件时与要被通知的订阅者之间的关系, 其中 `subscriber` 表示订阅者用户, `unsubscribeToken` 表示退订时的身份验证 token, `reason` 订阅者感兴趣的事件。 +`Subscription` 自定义模型,定义了特定事件时与要被通知的订阅者之间的关系, 其中 `subscriber` +表示订阅者用户, `unsubscribeToken` 表示退订时的身份验证 token, `reason` 订阅者感兴趣的事件。 用户可以通过 `Subscription` 来订阅自己感兴趣的事件,当事件触发时会收到通知: @@ -116,13 +118,24 @@ spec: apiVersion: content.halo.run/v1alpha1 kind: Post name: 'post-axgu' + # expression: 'props.owner == "guqing"' ``` -订阅退订链接 API 规则:`/apis/api.notification.halo.run/v1alpha1/subscriptions/{name}/unsubscribe?token={unsubscribeToken}`。 +- `spec.reason.subject`:用于根据事件的主体的匹配感兴趣的事件,如果不指定 name 则表示匹配主体与 kind 和 apiVersion + 相同的一类事件。 +- `spec.expression`:根据表达式匹配感兴趣的事件,例如 `props.owner == "guqing"` 表示只有当事件的属性(reason attributes)的 + owner 等于 guqing 时才会触发通知。表达式符合 SpEL + 表达式语法,但结果只能是布尔值。参考:[增强 Subscription 模型以支持表达式匹配](https://github.com/halo-dev/halo/issues/5632) + +> 当 `spec.expression` 和 `spec.reason.subject` 同时存在时,以 `spec.reason.subject` 的结果为准,不建议同时使用。 + +订阅退订链接 API +规则:`/apis/api.notification.halo.run/v1alpha1/subscriptions/{name}/unsubscribe?token={unsubscribeToken}`。 #### 用户通知偏好设置 -通过在用户偏好设置的 ConfigMap 中存储一个 `notification` key 用于保存事件类型与通知方式的关系设置,当用户订阅了如 'new-comment-on-post' 事件时会获取对应的通知方式来给用户发送通知。 +通过在用户偏好设置的 ConfigMap 中存储一个 `notification` key 用于保存事件类型与通知方式的关系设置,当用户订阅了如 ' +new-comment-on-post' 事件时会获取对应的通知方式来给用户发送通知。 ```yaml apiVersion: v1alpha1 @@ -153,7 +166,8 @@ data: #### Notification 站内通知 -当用户订阅到事件后会创建 `Notification`, 它与通知方式(notifier)无关,`recipient` 为用户名,类似站内通知,如用户 `guqing` 订阅了评论事件那么当监听到评论事件时会创建一条记录可以在个人中心的通知列表看到一条通知消息。 +当用户订阅到事件后会创建 `Notification`, 它与通知方式(notifier)无关,`recipient` 为用户名,类似站内通知,如用户 `guqing` +订阅了评论事件那么当监听到评论事件时会创建一条记录可以在个人中心的通知列表看到一条通知消息。 ```yaml apiVersion: notification.halo.run/v1alpha1 @@ -177,6 +191,7 @@ spec: `GET /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications` 2. 将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-as-read` 3. + 批量将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-specified-as-read` #### 通知模板 @@ -185,14 +200,18 @@ spec: 它通过定义 `reasonSelector` 来引用事件类别,当事件触发时会根据用户的语言偏好和触发事件的类别来选择一个最佳的通知模板。 选择通知模板的规则为: -1. 根据用户设置的语言,选择从通知模板中定义的 `spec.reasonSelector.language` 的值从更具体到不太具体的顺序(例如,gl_ES 的值将比 gl 的值具有更高的优先级)。 -2. 当通过语言成功匹配到模板时,匹配到的结果可能不止一个,如 `language` 为 `zh_CN` 的模板有三个那么会根据 `NotificationTemplate` 的 `metadata.creationTimestamp` 字段来选择一个最新的模板。 +1. 根据用户设置的语言,选择从通知模板中定义的 `spec.reasonSelector.language` 的值从更具体到不太具体的顺序(例如,gl_ES 的值将比 + gl 的值具有更高的优先级)。 +2. 当通过语言成功匹配到模板时,匹配到的结果可能不止一个,如 `language` 为 `zh_CN` + 的模板有三个那么会根据 `NotificationTemplate` 的 `metadata.creationTimestamp` 字段来选择一个最新的模板。 这样的规则有助于用户可以个性化定制某些事件的模板内容。 -模板语法使用 ThymeleafEngine 渲染,纯文本模板使用 `textual` 模板模式,语法参考: [usingthymeleaf.html#textual-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#textual-syntax) +模板语法使用 ThymeleafEngine 渲染,纯文本模板使用 `textual` +模板模式,语法参考: [usingthymeleaf.html#textual-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#textual-syntax) -`HTML` 则使用标准表达式语法在标签属性中取值,语法参考:[standard-expression-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#standard-expression-syntax) +`HTML` +则使用标准表达式语法在标签属性中取值,语法参考:[standard-expression-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#standard-expression-syntax) 在通知中心渲染模板时会在 `ReasonAttributes` 中提供额外属性包括: @@ -224,11 +243,12 @@ spec: #### 通知器声明及扩展 -`NotifierDescriptor` 自定义模型用于声明通知器,通过它来描述通知器的名称、描述和关联的 `ExtensionDefinition` 名称,让用户可以在用户界面知道通知器是什么以及它可以做什么, +`NotifierDescriptor` 自定义模型用于声明通知器,通过它来描述通知器的名称、描述和关联的 `ExtensionDefinition` +名称,让用户可以在用户界面知道通知器是什么以及它可以做什么, 还让 NotificationCenter 知道如何加载通知器和准备通知器需要的设置以发送通知。 ```yaml -apiVersion: notification.halo.run/v1alpha1 +apiVersion: notification.halo.run/v1alpha1 kind: NotifierDescriptor metadata: name: email-notifier @@ -261,52 +281,52 @@ spec: ```java public interface ReactiveNotifier extends ExtensionPoint { - /** - * Notify user. - * - * @param context notification context must not be null - */ - Mono notify(NotificationContext context); + /** + * Notify user. + * + * @param context notification context must not be null + */ + Mono notify(NotificationContext context); } @Data public class NotificationContext { - private Message message; + private Message message; - private ObjectNode receiverConfig; + private ObjectNode receiverConfig; - private ObjectNode senderConfig; + private ObjectNode senderConfig; - @Data - static class Message { - private MessagePayload payload; + @Data + static class Message { + private MessagePayload payload; - private Subject subject; + private Subject subject; - private String recipient; + private String recipient; - private Instant timestamp; - } + private Instant timestamp; + } - @Data - public static class Subject { - private String apiVersion; - private String kind; - private String name; - private String title; - private String url; - } + @Data + public static class Subject { + private String apiVersion; + private String kind; + private String name; + private String title; + private String url; + } - @Data - static class MessagePayload { - private String title; + @Data + static class MessagePayload { + private String title; - private String rawBody; - - private String htmlBody; + private String rawBody; - private ReasonAttributes attributes; - } + private String htmlBody; + + private ReasonAttributes attributes; + } } ``` diff --git a/ui/packages/api-client/src/models/interest-reason.ts b/ui/packages/api-client/src/models/interest-reason.ts index d1c3b7761..50b00c10b 100644 --- a/ui/packages/api-client/src/models/interest-reason.ts +++ b/ui/packages/api-client/src/models/interest-reason.ts @@ -23,6 +23,12 @@ import { InterestReasonSubject } from './interest-reason-subject'; * @interface InterestReason */ export interface InterestReason { + /** + * The expression to be interested in + * @type {string} + * @memberof InterestReason + */ + 'expression'?: string; /** * The name of the reason definition to be interested in * @type {string}