feat: subscription support for expression-based subscribing (#5705)

#### What type of PR is this?
/kind feature
/area core
/milestone 2.15.x

#### What this PR does / why we need it:
通知订阅支持基于表达式订阅

see #5632 for more details

how to test it?
1. 测试系统通知功能的文章、页面有新评论通知和评论有新回复通知的功能是否正常
2. 测试 2.14 创建的文章、评论和回复升级到此版本后是否能继续收到相应通知,如文章有新评论

#### Which issue(s) this PR fixes:
Fixes #5632

#### Does this PR introduce a user-facing change?
```release-note
通知订阅支持基于表达式订阅避免订阅随数据量增长同时自动优化之前的订阅数据
```
pull/5818/head
guqing 2024-04-26 18:26:41 +08:00 committed by GitHub
parent 58f82d2cc2
commit 0e17d53ede
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1422 additions and 521 deletions

View File

@ -15003,6 +15003,10 @@
], ],
"type": "object", "type": "object",
"properties": { "properties": {
"expression": {
"type": "string",
"description": "The expression to be interested in"
},
"reasonType": { "reasonType": {
"type": "string", "type": "string",
"description": "The name of the reason definition to be interested in" "description": "The name of the reason definition to be interested in"

View File

@ -11,7 +11,6 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString;
import run.halo.app.extension.AbstractExtension; import run.halo.app.extension.AbstractExtension;
import run.halo.app.extension.GVK; import run.halo.app.extension.GVK;
@ -59,6 +58,44 @@ public class Subscription extends AbstractExtension {
@Schema(requiredMode = REQUIRED, description = "The subject name of reason type to be" @Schema(requiredMode = REQUIRED, description = "The subject name of reason type to be"
+ " interested in") + " interested in")
private ReasonSubject subject; private ReasonSubject subject;
@Schema(requiredMode = NOT_REQUIRED, description = "The expression to be interested in")
private String expression;
/**
* <p>Since 2.15.0, we have added a new field <code>expression</code> to the
* <code>InterestReason</code> object, so <code>subject</code> can be null.</p>
* <p>In this particular scenario, when the <code>subject</code> is null, we assign it a
* default <code>ReasonSubject</code> object. The properties of this object are set to
* specific values that do not occur in actual applications, thus we can consider this as
* <code>nonexistent data</code>.
* The purpose of this approach is to maintain backward compatibility, even if the
* <code>subject</code> can be null in the new version of the code.</p>
*/
public static void ensureSubjectHasValue(InterestReason interestReason) {
if (interestReason.getSubject() == null) {
interestReason.setSubject(createFallbackSubject());
}
}
/**
* Check if the given reason subject is a fallback subject.
*/
public static boolean isFallbackSubject(ReasonSubject reasonSubject) {
if (reasonSubject == null) {
return true;
}
var fallback = createFallbackSubject();
return fallback.getKind().equals(reasonSubject.getKind())
&& fallback.getApiVersion().equals(reasonSubject.getApiVersion());
}
static ReasonSubject createFallbackSubject() {
return ReasonSubject.builder()
.apiVersion("notification.halo.run/v1alpha1")
.kind("NonexistentKind")
.build();
}
} }
@Data @Data
@ -85,10 +122,14 @@ public class Subscription extends AbstractExtension {
} }
@Data @Data
@ToString
@Schema(name = "SubscriptionSubscriber") @Schema(name = "SubscriptionSubscriber")
public static class Subscriber { public static class Subscriber {
private String name; private String name;
@Override
public String toString() {
return name;
}
} }
/** /**

View File

@ -1,6 +1,7 @@
package run.halo.app.content.comment; package run.halo.app.content.comment;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank; import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static run.halo.app.content.comment.ReplyNotificationSubscriptionHelper.identityFrom;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map; import java.util.Map;
@ -29,7 +30,6 @@ import run.halo.app.extension.Ref;
import run.halo.app.infra.ExternalLinkProcessor; import run.halo.app.infra.ExternalLinkProcessor;
import run.halo.app.infra.utils.JsonUtils; import run.halo.app.infra.utils.JsonUtils;
import run.halo.app.notification.NotificationReasonEmitter; import run.halo.app.notification.NotificationReasonEmitter;
import run.halo.app.notification.UserIdentity;
import run.halo.app.plugin.ExtensionComponentsFinder; import run.halo.app.plugin.ExtensionComponentsFinder;
import run.halo.app.plugin.extensionpoint.ExtensionGetter; import run.halo.app.plugin.extensionpoint.ExtensionGetter;
@ -114,6 +114,7 @@ public class CommentNotificationReasonPublisher {
builder -> { builder -> {
var attributes = CommentOnPostReasonData.builder() var attributes = CommentOnPostReasonData.builder()
.postName(subjectRef.getName()) .postName(subjectRef.getName())
.postOwner(post.getSpec().getOwner())
.postTitle(post.getSpec().getTitle()) .postTitle(post.getSpec().getTitle())
.postUrl(postUrl) .postUrl(postUrl)
.commenter(owner.getDisplayName()) .commenter(owner.getDisplayName())
@ -144,8 +145,9 @@ public class CommentNotificationReasonPublisher {
} }
@Builder @Builder
record CommentOnPostReasonData(String postName, String postTitle, String postUrl, record CommentOnPostReasonData(String postName, String postOwner, String postTitle,
String commenter, String content, String commentName) { String postUrl, String commenter, String content,
String commentName) {
} }
} }
@ -180,6 +182,7 @@ public class CommentNotificationReasonPublisher {
builder -> { builder -> {
var attributes = CommentOnPageReasonData.builder() var attributes = CommentOnPageReasonData.builder()
.pageName(subjectRef.getName()) .pageName(subjectRef.getName())
.pageOwner(singlePage.getSpec().getOwner())
.pageTitle(singlePage.getSpec().getTitle()) .pageTitle(singlePage.getSpec().getTitle())
.pageUrl(pageUrl) .pageUrl(pageUrl)
.commenter(defaultIfBlank(owner.getDisplayName(), owner.getName())) .commenter(defaultIfBlank(owner.getDisplayName(), owner.getName()))
@ -210,8 +213,9 @@ public class CommentNotificationReasonPublisher {
} }
@Builder @Builder
record CommentOnPageReasonData(String pageName, String pageTitle, String pageUrl, record CommentOnPageReasonData(String pageName, String pageOwner, String pageTitle,
String commenter, String content, String commentName) { String pageUrl, String commenter, String content,
String commentName) {
} }
} }
@ -224,13 +228,6 @@ public class CommentNotificationReasonPublisher {
} }
} }
static UserIdentity identityFrom(Comment.CommentOwner owner) {
if (Comment.CommentOwner.KIND_EMAIL.equals(owner.getKind())) {
return UserIdentity.anonymousWithEmail(owner.getName());
}
return UserIdentity.of(owner.getName());
}
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
static class NewReplyReasonPublisher { static class NewReplyReasonPublisher {
@ -272,6 +269,10 @@ public class CommentNotificationReasonPublisher {
.orElse(null); .orElse(null);
var replyOwner = reply.getSpec().getOwner(); var replyOwner = reply.getSpec().getOwner();
var repliedOwner = quoteReplyOptional
.map(quoteReply -> quoteReply.getSpec().getOwner())
.orElseGet(() -> comment.getSpec().getOwner());
var reasonAttributesBuilder = NewReplyReasonData.builder() var reasonAttributesBuilder = NewReplyReasonData.builder()
.commentContent(comment.getSpec().getContent()) .commentContent(comment.getSpec().getContent())
.isQuoteReply(isQuoteReply) .isQuoteReply(isQuoteReply)
@ -279,7 +280,9 @@ public class CommentNotificationReasonPublisher {
.commentName(comment.getMetadata().getName()) .commentName(comment.getMetadata().getName())
.replier(defaultIfBlank(replyOwner.getDisplayName(), replyOwner.getName())) .replier(defaultIfBlank(replyOwner.getDisplayName(), replyOwner.getName()))
.content(reply.getSpec().getContent()) .content(reply.getSpec().getContent())
.replyName(reply.getMetadata().getName()); .replyName(reply.getMetadata().getName())
.replyOwner(identityFrom(replyOwner).name())
.repliedOwner(identityFrom(repliedOwner).name());
getCommentSubjectDisplay(comment.getSpec().getSubjectRef()) getCommentSubjectDisplay(comment.getSpec().getSubjectRef())
.ifPresent(subject -> { .ifPresent(subject -> {
@ -337,7 +340,7 @@ public class CommentNotificationReasonPublisher {
String commentSubjectUrl, boolean isQuoteReply, String commentSubjectUrl, boolean isQuoteReply,
String quoteContent, String quoteContent,
String commentName, String replier, String content, String commentName, String replier, String content,
String replyName) { String replyName, String replyOwner, String repliedOwner) {
} }
} }
} }

View File

@ -9,7 +9,7 @@ import run.halo.app.core.extension.content.Comment;
import run.halo.app.core.extension.content.Reply; import run.halo.app.core.extension.content.Reply;
import run.halo.app.core.extension.notification.Subscription; import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.notification.NotificationCenter; import run.halo.app.notification.NotificationCenter;
import run.halo.app.notification.SubscriberEmailResolver; import run.halo.app.notification.UserIdentity;
/** /**
* Reply notification subscription helper. * Reply notification subscription helper.
@ -22,7 +22,6 @@ import run.halo.app.notification.SubscriberEmailResolver;
public class ReplyNotificationSubscriptionHelper { public class ReplyNotificationSubscriptionHelper {
private final NotificationCenter notificationCenter; private final NotificationCenter notificationCenter;
private final SubscriberEmailResolver subscriberEmailResolver;
/** /**
* Subscribe new reply reason for comment. * Subscribe new reply reason for comment.
@ -30,13 +29,7 @@ public class ReplyNotificationSubscriptionHelper {
* @param comment comment * @param comment comment
*/ */
public void subscribeNewReplyReasonForComment(Comment comment) { public void subscribeNewReplyReasonForComment(Comment comment) {
var reasonSubject = Subscription.ReasonSubject.builder() subscribeReply(identityFrom(comment.getSpec().getOwner()));
.apiVersion(comment.getApiVersion())
.kind(comment.getKind())
.name(comment.getMetadata().getName())
.build();
subscribeReply(reasonSubject,
Identity.fromCommentOwner(comment.getSpec().getOwner()));
} }
/** /**
@ -45,50 +38,36 @@ public class ReplyNotificationSubscriptionHelper {
* @param reply reply * @param reply reply
*/ */
public void subscribeNewReplyReasonForReply(Reply reply) { public void subscribeNewReplyReasonForReply(Reply reply) {
var reasonSubject = Subscription.ReasonSubject.builder()
.apiVersion(reply.getApiVersion())
.kind(reply.getKind())
.name(reply.getMetadata().getName())
.build();
var subjectOwner = reply.getSpec().getOwner(); var subjectOwner = reply.getSpec().getOwner();
subscribeReply(reasonSubject, subscribeReply(identityFrom(subjectOwner));
Identity.fromCommentOwner(subjectOwner));
} }
void subscribeReply(Subscription.ReasonSubject reasonSubject, void subscribeReply(UserIdentity identity) {
Identity identity) {
var subscriber = createSubscriber(identity); var subscriber = createSubscriber(identity);
if (subscriber == null) { if (subscriber == null) {
return; return;
} }
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU); interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU);
interestReason.setSubject(reasonSubject); interestReason.setExpression("props.repliedOwner == '%s'".formatted(identity.name()));
notificationCenter.subscribe(subscriber, interestReason).block(); notificationCenter.subscribe(subscriber, interestReason).block();
} }
@Nullable @Nullable
private Subscription.Subscriber createSubscriber(Identity author) { private Subscription.Subscriber createSubscriber(UserIdentity author) {
if (StringUtils.isBlank(author.name())) { if (StringUtils.isBlank(author.name())) {
return null; return null;
} }
Subscription.Subscriber subscriber; Subscription.Subscriber subscriber = new Subscription.Subscriber();
if (author.isEmail()) { subscriber.setName(author.name());
subscriber = subscriberEmailResolver.ofEmail(author.name());
} else {
subscriber = new Subscription.Subscriber();
subscriber.setName(author.name());
}
return subscriber; return subscriber;
} }
record Identity(String name, boolean isEmail) { public static UserIdentity identityFrom(Comment.CommentOwner owner) {
public static Identity fromCommentOwner(Comment.CommentOwner commentOwner) { if (Comment.CommentOwner.KIND_EMAIL.equals(owner.getKind())) {
if (Comment.CommentOwner.KIND_EMAIL.equals(commentOwner.getKind())) { return UserIdentity.anonymousWithEmail(owner.getName());
return new Identity(commentOwner.getName(), true);
}
return new Identity(commentOwner.getName(), false);
} }
return UserIdentity.of(owner.getName());
} }
} }

View File

@ -67,12 +67,11 @@ public class CommentReconciler implements Reconciler<Reconciler.Request> {
return; return;
} }
if (addFinalizers(comment.getMetadata(), Set.of(FINALIZER_NAME))) { if (addFinalizers(comment.getMetadata(), Set.of(FINALIZER_NAME))) {
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment);
client.update(comment); client.update(comment);
eventPublisher.publishEvent(new CommentCreatedEvent(this, comment)); eventPublisher.publishEvent(new CommentCreatedEvent(this, comment));
} }
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment);
compatibleCreationTime(comment); compatibleCreationTime(comment);
Comment.CommentStatus status = comment.getStatusOrDefault(); Comment.CommentStatus status = comment.getStatusOrDefault();
status.setHasNewReply(defaultIfNull(status.getUnreadReplyCount(), 0) > 0); status.setHasNewReply(defaultIfNull(status.getUnreadReplyCount(), 0) > 0);

View File

@ -246,11 +246,8 @@ public class PostReconciler implements Reconciler<Reconciler.Request> {
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST);
interestReason.setSubject(Subscription.ReasonSubject.builder() interestReason.setExpression(
.apiVersion(post.getApiVersion()) "props.postOwner == '%s'".formatted(post.getSpec().getOwner()));
.kind(post.getKind())
.name(post.getMetadata().getName())
.build());
notificationCenter.subscribe(subscriber, interestReason).block(); notificationCenter.subscribe(subscriber, interestReason).block();
} }

View File

@ -46,6 +46,7 @@ public class ReplyReconciler implements Reconciler<Reconciler.Request> {
return; return;
} }
if (addFinalizers(reply.getMetadata(), Set.of(FINALIZER_NAME))) { if (addFinalizers(reply.getMetadata(), Set.of(FINALIZER_NAME))) {
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply);
client.update(reply); client.update(reply);
eventPublisher.publishEvent(new ReplyCreatedEvent(this, reply)); eventPublisher.publishEvent(new ReplyCreatedEvent(this, reply));
} }
@ -64,8 +65,6 @@ public class ReplyReconciler implements Reconciler<Reconciler.Request> {
client.update(reply); client.update(reply);
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply);
eventPublisher.publishEvent(new ReplyChangedEvent(this, reply)); eventPublisher.publishEvent(new ReplyChangedEvent(this, reply));
}); });
return new Result(false, null); return new Result(false, null);

View File

@ -107,11 +107,8 @@ public class SinglePageReconciler implements Reconciler<Reconciler.Request> {
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE);
interestReason.setSubject(Subscription.ReasonSubject.builder() interestReason.setExpression(
.apiVersion(page.getApiVersion()) "props.pageOwner == '%s'".formatted(page.getSpec().getOwner()));
.kind(page.getKind())
.name(page.getMetadata().getName())
.build());
notificationCenter.subscribe(subscriber, interestReason).block(); notificationCenter.subscribe(subscriber, interestReason).block();
} }

View File

@ -0,0 +1,37 @@
package run.halo.app.infra;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;
/**
* Reactive extension paginated operator to handle extensions by pagination.
*
* @author guqing
* @since 2.15.0
*/
public interface ReactiveExtensionPaginatedOperator {
/**
* <p>Deletes all data, including any new entries added during the execution of this method.</p>
* <p>This method continuously monitors and removes data that appears throughout its runtime,
* ensuring that even data created during the deletion process is also removed.</p>
*/
<E extends Extension> Mono<Void> deleteContinuously(Class<E> type,
ListOptions listOptions);
/**
* <p>Deletes only the data that existed at the start of the operation.</p>
* <p>This method takes a snapshot of the data at the beginning and deletes only that dataset;
* any data added after the method starts will not be affected or removed.</p>
*/
<E extends Extension> Flux<E> deleteInitialBatch(Class<E> type,
ListOptions listOptions);
/**
* <p>Note that: This method can not be used for <code>deletion</code> operation, because
* deletion operation will change the total records.</p>
*/
<E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions);
}

View File

@ -0,0 +1,132 @@
package run.halo.app.infra;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import java.time.Duration;
import java.time.Instant;
import lombok.RequiredArgsConstructor;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
@Component
@RequiredArgsConstructor
public class ReactiveExtensionPaginatedOperatorImpl implements ReactiveExtensionPaginatedOperator {
private static final int DEFAULT_PAGE_SIZE = 200;
private final ReactiveExtensionClient client;
@Override
public <E extends Extension> Mono<Void> deleteContinuously(Class<E> type,
ListOptions listOptions) {
var pageRequest = createPageRequest();
return cleanupContinuously(type, listOptions, pageRequest);
}
private <E extends Extension> Mono<Void> cleanupContinuously(Class<E> type,
ListOptions listOptions,
PageRequest pageRequest) {
// forever loop first page until no more to delete
return pageBy(type, listOptions, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(client::delete)
.then(page.hasNext() ? cleanupContinuously(type, listOptions, pageRequest)
: Mono.empty())
);
}
@Override
public <E extends Extension> Flux<E> deleteInitialBatch(Class<E> type,
ListOptions listOptions) {
var pageRequest = createPageRequest();
var newFieldQuery = listOptions.getFieldSelector()
.andQuery(isNull("metadata.deletionTimestamp"));
listOptions.setFieldSelector(newFieldQuery);
final Instant now = Instant.now();
return pageBy(type, listOptions, pageRequest)
// forever loop first page until no more to delete
.expand(result -> result.hasNext()
? pageBy(type, listOptions, pageRequest) : Mono.empty())
.flatMap(result -> Flux.fromIterable(result.getItems()))
.takeWhile(item -> shouldTakeNext(item, now))
.flatMap(this::deleteWithRetry);
}
static <E extends Extension> boolean shouldTakeNext(E item, Instant now) {
var creationTimestamp = item.getMetadata().getCreationTimestamp();
return creationTimestamp.isBefore(now)
|| creationTimestamp.equals(now);
}
@SuppressWarnings("unchecked")
<E extends Extension> Mono<E> deleteWithRetry(E item) {
return client.delete(item)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete((Class<E>) item.getClass(), item.getMetadata().getName()));
}
private <E extends Extension> Mono<E> attemptToDelete(Class<E> type, String name) {
return Mono.defer(() -> client.fetch(type, name)
.flatMap(client::delete)
)
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}
@Override
public <E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions) {
var pageRequest = createPageRequest();
return list(type, listOptions, pageRequest);
}
/**
* Paginated list all items to avoid memory overflow.
* <pre>
* 1. Retrieve data multiple times until all data is consumed.
* 2. Fetch next page if current page has more data and consumed records is less than total
* records.
* 3. Take while consumed records is less than total records.
* 4. totalRecords from first page to ensure new inserted data will not be counted in during
* querying to avoid infinite loop.
* </pre>
*/
private <E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions,
PageRequest pageRequest) {
final var now = Instant.now();
return pageBy(type, listOptions, pageRequest)
.expand(result -> {
if (result.hasNext()) {
// fetch next page
var nextPage = nextPage(result, pageRequest.getSort());
return pageBy(type, listOptions, nextPage);
} else {
return Mono.empty();
}
})
.flatMap(page -> Flux.fromIterable(page.getItems()))
.takeWhile(item -> shouldTakeNext(item, now));
}
static <E extends Extension> PageRequest nextPage(ListResult<E> result, Sort sort) {
return PageRequestImpl.of(result.getPage() + 1, result.getSize(), sort);
}
private PageRequest createPageRequest() {
return PageRequestImpl.of(1, DEFAULT_PAGE_SIZE,
Sort.by("metadata.creationTimestamp", "metadata.name"));
}
private <E extends Extension> Mono<ListResult<E>> pageBy(Class<E> type, ListOptions listOptions,
PageRequest pageRequest) {
return client.listBy(type, listOptions, pageRequest);
}
}

View File

@ -438,6 +438,11 @@ public class SchemeInitializer implements ApplicationListener<ApplicationContext
.setIndexFunc(simpleAttribute(Subscription.class, .setIndexFunc(simpleAttribute(Subscription.class,
subscription -> subscription.getSpec().getReason().getSubject().toString())) subscription -> subscription.getSpec().getReason().getSubject().toString()))
); );
indexSpecs.add(new IndexSpec()
.setName("spec.reason.expression")
.setIndexFunc(simpleAttribute(Subscription.class,
subscription -> subscription.getSpec().getReason().getExpression()))
);
indexSpecs.add(new IndexSpec() indexSpecs.add(new IndexSpec()
.setName("spec.subscriber") .setName("spec.subscriber")
.setIndexFunc(simpleAttribute(Subscription.class, .setIndexFunc(simpleAttribute(Subscription.class,

View File

@ -1,23 +1,14 @@
package run.halo.app.notification; package run.halo.app.notification;
import static org.apache.commons.lang3.StringUtils.defaultString; import static org.apache.commons.lang3.StringUtils.defaultString;
import static run.halo.app.extension.index.query.QueryFactory.and;
import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.or;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Function;
import lombok.Builder; import lombok.Builder;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -27,15 +18,8 @@ import run.halo.app.core.extension.notification.NotifierDescriptor;
import run.halo.app.core.extension.notification.Reason; import run.halo.app.core.extension.notification.Reason;
import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.ReasonType;
import run.halo.app.core.extension.notification.Subscription; import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.Metadata; import run.halo.app.extension.Metadata;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.index.query.Query;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.notification.endpoint.SubscriptionRouter; import run.halo.app.notification.endpoint.SubscriptionRouter;
/** /**
@ -55,37 +39,33 @@ public class DefaultNotificationCenter implements NotificationCenter {
private final UserNotificationPreferenceService userNotificationPreferenceService; private final UserNotificationPreferenceService userNotificationPreferenceService;
private final NotificationTemplateRender notificationTemplateRender; private final NotificationTemplateRender notificationTemplateRender;
private final SubscriptionRouter subscriptionRouter; private final SubscriptionRouter subscriptionRouter;
private final RecipientResolver recipientResolver;
private final SubscriptionService subscriptionService;
@Override @Override
public Mono<Void> notify(Reason reason) { public Mono<Void> notify(Reason reason) {
var reasonSubject = reason.getSpec().getSubject(); return recipientResolver.resolve(reason)
var subscriptionReasonSubject = Subscription.ReasonSubject.builder() .doOnNext(subscriber -> {
.apiVersion(reasonSubject.getApiVersion())
.kind(reasonSubject.getKind())
.name(reasonSubject.getName())
.build();
return listObservers(reason.getSpec().getReasonType(), subscriptionReasonSubject)
.doOnNext(subscription -> {
log.debug("Dispatching notification to subscriber [{}] for reason [{}]", log.debug("Dispatching notification to subscriber [{}] for reason [{}]",
subscription.getSpec().getSubscriber(), reason.getMetadata().getName()); subscriber, reason.getMetadata().getName());
}) })
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())
.flatMap(subscription -> dispatchNotification(reason, subscription)) .flatMap(subscriber -> dispatchNotification(reason, subscriber))
.then(); .then();
} }
@Override @Override
public Mono<Subscription> subscribe(Subscription.Subscriber subscriber, public Mono<Subscription> subscribe(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) { Subscription.InterestReason reason) {
return listSubscription(subscriber, reason) return unsubscribe(subscriber, reason)
.next() .then(Mono.defer(() -> {
.switchIfEmpty(Mono.defer(() -> {
var subscription = new Subscription(); var subscription = new Subscription();
subscription.setMetadata(new Metadata()); subscription.setMetadata(new Metadata());
subscription.getMetadata().setGenerateName("subscription-"); subscription.getMetadata().setGenerateName("subscription-");
subscription.setSpec(new Subscription.Spec()); subscription.setSpec(new Subscription.Spec());
subscription.getSpec().setUnsubscribeToken(Subscription.generateUnsubscribeToken()); subscription.getSpec().setUnsubscribeToken(Subscription.generateUnsubscribeToken());
subscription.getSpec().setSubscriber(subscriber); subscription.getSpec().setSubscriber(subscriber);
Subscription.InterestReason.ensureSubjectHasValue(reason);
subscription.getSpec().setReason(reason); subscription.getSpec().setReason(reason);
return client.create(subscription); return client.create(subscription);
})); }));
@ -93,75 +73,47 @@ public class DefaultNotificationCenter implements NotificationCenter {
@Override @Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber) { public Mono<Void> unsubscribe(Subscription.Subscriber subscriber) {
// pagination query all subscriptions of the subscriber to avoid large data return subscriptionService.remove(subscriber).then();
var pageRequest = PageRequestImpl.of(1, 200,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> pageSubscriptionBy(subscriber, pageRequest))
.expand(page -> page.hasNext()
? pageSubscriptionBy(subscriber, pageRequest.next())
: Mono.empty()
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
} }
@Override @Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber, public Mono<Void> unsubscribe(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) { Subscription.InterestReason reason) {
return listSubscription(subscriber, reason) return subscriptionService.remove(subscriber, reason).then();
.flatMap(client::delete)
.then();
} }
Mono<ListResult<Subscription>> pageSubscriptionBy(Subscription.Subscriber subscriber, Flux<String> getNotifiersBySubscriber(Subscriber subscriber, Reason reason) {
PageRequest pageRequest) {
var listOptions = new ListOptions();
var fieldQuery = equal("spec.subscriber", subscriber.getName());
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return client.listBy(Subscription.class, listOptions, pageRequest);
}
Flux<Subscription> listSubscription(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) {
var listOptions = new ListOptions();
var fieldQuery = and(
getSubscriptionFieldQuery(reason.getReasonType(), reason.getSubject()),
equal("spec.subscriber", subscriber.getName())
);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return client.listAll(Subscription.class, listOptions, defaultSort());
}
Flux<String> getNotifiersBySubscriber(Subscription.Subscriber subscriber, Reason reason) {
var reasonType = reason.getSpec().getReasonType(); var reasonType = reason.getSpec().getReasonType();
return userNotificationPreferenceService.getByUser(subscriber.getName()) return userNotificationPreferenceService.getByUser(subscriber.name())
.map(UserNotificationPreference::getReasonTypeNotifier) .map(UserNotificationPreference::getReasonTypeNotifier)
.map(reasonTypeNotification -> reasonTypeNotification.getNotifiers(reasonType)) .map(reasonTypeNotification -> reasonTypeNotification.getNotifiers(reasonType))
.flatMapMany(Flux::fromIterable); .flatMapMany(Flux::fromIterable);
} }
Mono<Void> dispatchNotification(Reason reason, Subscription subscription) { Mono<Void> dispatchNotification(Reason reason, Subscriber subscriber) {
var subscriber = subscription.getSpec().getSubscriber();
return getNotifiersBySubscriber(subscriber, reason) return getNotifiersBySubscriber(subscriber, reason)
.flatMap(notifierName -> client.fetch(NotifierDescriptor.class, notifierName)) .flatMap(notifierName -> client.fetch(NotifierDescriptor.class, notifierName))
.flatMap(descriptor -> prepareNotificationElement(subscription, reason, descriptor)) .flatMap(descriptor -> prepareNotificationElement(subscriber, reason, descriptor))
.flatMap(element -> { .flatMap(element -> {
var dispatchMono = sendNotification(element); var dispatchMono = sendNotification(element);
if (subscriber.isAnonymous()) {
return dispatchMono;
}
// create notification for user
var innerNofificationMono = createNotification(element); var innerNofificationMono = createNotification(element);
return Mono.when(dispatchMono, innerNofificationMono); return Mono.when(dispatchMono, innerNofificationMono);
}) })
.then(); .then();
} }
Mono<NotificationElement> prepareNotificationElement(Subscription subscription, Reason reason, Mono<NotificationElement> prepareNotificationElement(Subscriber subscriber, Reason reason,
NotifierDescriptor descriptor) { NotifierDescriptor descriptor) {
return getLocaleFromSubscriber(subscription) return getLocaleFromSubscriber(subscriber)
.flatMap(locale -> inferenceTemplate(reason, subscription, locale)) .flatMap(locale -> inferenceTemplate(reason, subscriber, locale))
.map(notificationContent -> NotificationElement.builder() .map(notificationContent -> NotificationElement.builder()
.descriptor(descriptor) .descriptor(descriptor)
.reason(reason) .reason(reason)
.subscription(subscription) .subscriber(subscriber)
.reasonType(notificationContent.reasonType()) .reasonType(notificationContent.reasonType())
.notificationTitle(notificationContent.title()) .notificationTitle(notificationContent.title())
.reasonAttributes(notificationContent.reasonAttributes()) .reasonAttributes(notificationContent.reasonAttributes())
@ -173,7 +125,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
Mono<Void> sendNotification(NotificationElement notificationElement) { Mono<Void> sendNotification(NotificationElement notificationElement) {
var descriptor = notificationElement.descriptor(); var descriptor = notificationElement.descriptor();
var subscription = notificationElement.subscription(); var subscriber = notificationElement.subscriber();
final var notifierExtName = descriptor.getSpec().getNotifierExtName(); final var notifierExtName = descriptor.getSpec().getNotifierExtName();
return notificationContextFrom(notificationElement) return notificationContextFrom(notificationElement)
.flatMap(notificationContext -> notificationSender.sendNotification(notifierExtName, .flatMap(notificationContext -> notificationSender.sendNotification(notifierExtName,
@ -181,7 +133,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
.onErrorResume(throwable -> { .onErrorResume(throwable -> {
log.error( log.error(
"Failed to send notification to subscriber [{}] through notifier [{}]", "Failed to send notification to subscriber [{}] through notifier [{}]",
subscription.getSpec().getSubscriber(), subscriber,
descriptor.getSpec().getDisplayName(), descriptor.getSpec().getDisplayName(),
throwable); throwable);
return Mono.empty(); return Mono.empty();
@ -192,9 +144,8 @@ public class DefaultNotificationCenter implements NotificationCenter {
Mono<Notification> createNotification(NotificationElement notificationElement) { Mono<Notification> createNotification(NotificationElement notificationElement) {
var reason = notificationElement.reason(); var reason = notificationElement.reason();
var subscription = notificationElement.subscription(); var subscriber = notificationElement.subscriber();
var subscriber = subscription.getSpec().getSubscriber(); return client.fetch(User.class, subscriber.name())
return client.fetch(User.class, subscriber.getName())
.flatMap(user -> { .flatMap(user -> {
Notification notification = new Notification(); Notification notification = new Notification();
notification.setMetadata(new Metadata()); notification.setMetadata(new Metadata());
@ -203,7 +154,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
notification.getSpec().setTitle(notificationElement.notificationTitle()); notification.getSpec().setTitle(notificationElement.notificationTitle());
notification.getSpec().setRawContent(notificationElement.notificationRawBody()); notification.getSpec().setRawContent(notificationElement.notificationRawBody());
notification.getSpec().setHtmlContent(notificationElement.notificationHtmlBody); notification.getSpec().setHtmlContent(notificationElement.notificationHtmlBody);
notification.getSpec().setRecipient(subscriber.getName()); notification.getSpec().setRecipient(subscriber.name());
notification.getSpec().setReason(reason.getMetadata().getName()); notification.getSpec().setReason(reason.getMetadata().getName());
notification.getSpec().setUnread(true); notification.getSpec().setUnread(true);
return client.create(notification); return client.create(notification);
@ -223,7 +174,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
final var descriptorName = element.descriptor().getMetadata().getName(); final var descriptorName = element.descriptor().getMetadata().getName();
final var reason = element.reason(); final var reason = element.reason();
final var descriptor = element.descriptor(); final var descriptor = element.descriptor();
final var subscription = element.subscription(); final var subscriber = element.subscriber();
var messagePayload = new NotificationContext.MessagePayload(); var messagePayload = new NotificationContext.MessagePayload();
messagePayload.setTitle(element.notificationTitle()); messagePayload.setTitle(element.notificationTitle());
@ -232,7 +183,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
messagePayload.setAttributes(element.reasonAttributes()); messagePayload.setAttributes(element.reasonAttributes());
var message = new NotificationContext.Message(); var message = new NotificationContext.Message();
message.setRecipient(subscription.getSpec().getSubscriber().getName()); message.setRecipient(subscriber.name());
message.setPayload(messagePayload); message.setPayload(messagePayload);
message.setTimestamp(reason.getMetadata().getCreationTimestamp()); message.setTimestamp(reason.getMetadata().getCreationTimestamp());
var reasonSubject = reason.getSpec().getSubject(); var reasonSubject = reason.getSpec().getSubject();
@ -270,25 +221,25 @@ public class DefaultNotificationCenter implements NotificationCenter {
}); });
} }
Mono<NotificationContent> inferenceTemplate(Reason reason, Subscription subscription, Mono<NotificationContent> inferenceTemplate(Reason reason, Subscriber subscriber,
Locale locale) { Locale locale) {
var reasonTypeName = reason.getSpec().getReasonType(); var reasonTypeName = reason.getSpec().getReasonType();
var subscriber = subscription.getSpec().getSubscriber();
return getReasonType(reasonTypeName) return getReasonType(reasonTypeName)
.flatMap(reasonType -> notificationTemplateSelector.select(reasonTypeName, locale) .flatMap(reasonType -> notificationTemplateSelector.select(reasonTypeName, locale)
.flatMap(template -> { .flatMap(template -> {
final var templateContent = template.getSpec().getTemplate(); final var templateContent = template.getSpec().getTemplate();
var model = toReasonAttributes(reason); var model = toReasonAttributes(reason);
var identity = UserIdentity.of(subscriber.getName());
var subscriberInfo = new HashMap<>(); var subscriberInfo = new HashMap<>();
if (identity.isAnonymous()) { if (subscriber.isAnonymous()) {
subscriberInfo.put("displayName", identity.getEmail().orElse("")); subscriberInfo.put("displayName", subscriber.getEmail().orElseThrow());
} else { } else {
subscriberInfo.put("displayName", "@" + identity.name()); subscriberInfo.put("displayName", "@" + subscriber.username());
} }
subscriberInfo.put("id", subscriber.getName()); subscriberInfo.put("id", subscriber.name());
model.put("subscriber", subscriberInfo); model.put("subscriber", subscriberInfo);
model.put("unsubscribeUrl", getUnsubscribeUrl(subscription));
var unsubscriptionMono = getUnsubscribeUrl(subscriber.subscriptionName())
.doOnNext(url -> model.put("unsubscribeUrl", url));
var builder = NotificationContent.builder() var builder = NotificationContent.builder()
.reasonType(reasonType) .reasonType(reasonType)
@ -305,7 +256,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
var htmlBodyMono = notificationTemplateRender var htmlBodyMono = notificationTemplateRender
.render(templateContent.getHtmlBody(), model) .render(templateContent.getHtmlBody(), model)
.doOnNext(builder::htmlBody); .doOnNext(builder::htmlBody);
return Mono.when(titleMono, rawBodyMono, htmlBodyMono) return Mono.when(unsubscriptionMono, titleMono, rawBodyMono, htmlBodyMono)
.then(Mono.fromSupplier(builder::build)); .then(Mono.fromSupplier(builder::build));
}) })
); );
@ -316,13 +267,14 @@ public class DefaultNotificationCenter implements NotificationCenter {
ReasonAttributes reasonAttributes) { ReasonAttributes reasonAttributes) {
} }
String getUnsubscribeUrl(Subscription subscription) { Mono<String> getUnsubscribeUrl(String subscriptionName) {
return subscriptionRouter.getUnsubscribeUrl(subscription); return client.get(Subscription.class, subscriptionName)
.map(subscriptionRouter::getUnsubscribeUrl);
} }
@Builder @Builder
record NotificationElement(ReasonType reasonType, Reason reason, record NotificationElement(ReasonType reasonType, Reason reason,
Subscription subscription, NotifierDescriptor descriptor, Subscriber subscriber, NotifierDescriptor descriptor,
String notificationTitle, String notificationTitle,
String notificationRawBody, String notificationRawBody,
String notificationHtmlBody, String notificationHtmlBody,
@ -333,80 +285,8 @@ public class DefaultNotificationCenter implements NotificationCenter {
return client.get(ReasonType.class, reasonTypeName); return client.get(ReasonType.class, reasonTypeName);
} }
Mono<Locale> getLocaleFromSubscriber(Subscription subscription) { Mono<Locale> getLocaleFromSubscriber(Subscriber subscriber) {
// TODO get locale from subscriber // TODO get locale from subscriber
return Mono.just(Locale.getDefault()); return Mono.just(Locale.getDefault());
} }
Flux<Subscription> listObservers(String reasonTypeName,
Subscription.ReasonSubject reasonSubject) {
Assert.notNull(reasonTypeName, "The reasonTypeName must not be null");
Assert.notNull(reasonSubject, "The reasonSubject must not be null");
final var listOptions = new ListOptions();
var fieldQuery = getSubscriptionFieldQuery(reasonTypeName, reasonSubject);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort()));
}
private static Query getSubscriptionFieldQuery(String reasonTypeName,
Subscription.ReasonSubject reasonSubject) {
var matchAllSubject = new Subscription.ReasonSubject();
matchAllSubject.setKind(reasonSubject.getKind());
matchAllSubject.setApiVersion(reasonSubject.getApiVersion());
return and(equal("spec.reason.reasonType", reasonTypeName),
or(equal("spec.reason.subject", reasonSubject.toString()),
// source reason subject name is blank present match all
equal("spec.reason.subject", matchAllSubject.toString())
)
);
}
static Flux<Subscription> distinctByKey(Flux<Subscription> source) {
final var distinctKeyPredicate = subscriptionDistinctKeyPredicate();
return source.distinct(Function.identity(), HashSet<Subscription>::new,
(set, val) -> {
for (Subscription subscription : set) {
if (distinctKeyPredicate.test(subscription, val)) {
return false;
}
}
// no duplicated return true
set.add(val);
return true;
},
HashSet::clear);
}
Sort defaultSort() {
return Sort.by(Sort.Order.asc("metadata.creationTimestamp"),
Sort.Order.asc("metadata.name"));
}
static BiPredicate<Subscription, Subscription> subscriptionDistinctKeyPredicate() {
return (a, b) -> {
if (!a.getSpec().getSubscriber().equals(b.getSpec().getSubscriber())) {
return false;
}
var reasonA = a.getSpec().getReason();
var reasonB = b.getSpec().getReason();
if (!reasonA.getReasonType().equals(reasonB.getReasonType())) {
return false;
}
var ars = reasonA.getSubject();
var brs = reasonB.getSubject();
var gvkForA =
GroupVersionKind.fromAPIVersionAndKind(ars.getApiVersion(), ars.getKind());
var gvkForB =
GroupVersionKind.fromAPIVersionAndKind(brs.getApiVersion(), brs.getKind());
if (!gvkForA.groupKind().equals(gvkForB.groupKind())) {
return false;
}
// name is blank present match all
if (StringUtils.isBlank(ars.getName()) || StringUtils.isBlank(brs.getName())) {
return true;
}
return ars.getName().equals(brs.getName());
};
}
} }

View File

@ -21,13 +21,12 @@ import run.halo.app.extension.ReactiveExtensionClient;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class DefaultSubscriberEmailResolver implements SubscriberEmailResolver { public class DefaultSubscriberEmailResolver implements SubscriberEmailResolver {
private static final String SEPARATOR = "#";
private final ReactiveExtensionClient client; private final ReactiveExtensionClient client;
@Override @Override
public Mono<String> resolve(Subscription.Subscriber subscriber) { public Mono<String> resolve(Subscription.Subscriber subscriber) {
if (isEmailSubscriber(subscriber)) { var identity = UserIdentity.of(subscriber.getName());
if (identity.isAnonymous()) {
return Mono.fromSupplier(() -> getEmail(subscriber)); return Mono.fromSupplier(() -> getEmail(subscriber));
} }
return client.fetch(User.class, subscriber.getName()) return client.fetch(User.class, subscriber.getName())
@ -44,20 +43,14 @@ public class DefaultSubscriberEmailResolver implements SubscriberEmailResolver {
return subscriber; return subscriber;
} }
static boolean isEmailSubscriber(Subscription.Subscriber subscriber) {
return UserIdentity.of(subscriber.getName()).isAnonymous();
}
@NonNull @NonNull
String getEmail(Subscription.Subscriber subscriber) { String getEmail(Subscription.Subscriber subscriber) {
if (!isEmailSubscriber(subscriber)) { var identity = UserIdentity.of(subscriber.getName());
throw new IllegalStateException("The subscriber is not an email subscriber"); if (!identity.isAnonymous()) {
throw new IllegalStateException("The subscriber is not an anonymous subscriber");
} }
var subscriberName = subscriber.getName(); return identity.getEmail()
String email = subscriberName.substring(subscriberName.indexOf(SEPARATOR) + 1); .filter(StringUtils::isNotBlank)
if (StringUtils.isBlank(email)) { .orElseThrow(() -> new IllegalStateException("The subscriber does not have an email"));
throw new IllegalStateException("The subscriber does not have an email");
}
return email;
} }
} }

View File

@ -0,0 +1,9 @@
package run.halo.app.notification;
import reactor.core.publisher.Flux;
import run.halo.app.core.extension.notification.Reason;
public interface RecipientResolver {
Flux<Subscriber> resolve(Reason reason);
}

View File

@ -0,0 +1,116 @@
package run.halo.app.notification;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import com.google.common.base.Throwables;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.EvaluationException;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.ParseException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.DataBindingPropertyAccessor;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.integration.json.JsonPropertyAccessor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import run.halo.app.core.extension.notification.Reason;
import run.halo.app.core.extension.notification.Subscription;
@Slf4j
@Component
@RequiredArgsConstructor
public class RecipientResolverImpl implements RecipientResolver {
private final ExpressionParser expressionParser = new SpelExpressionParser();
private final EvaluationContext evaluationContext = createEvaluationContext();
private final SubscriptionService subscriptionService;
@Override
public Flux<Subscriber> resolve(Reason reason) {
var reasonType = reason.getSpec().getReasonType();
return subscriptionService.listByPerPage(reasonType)
.filter(this::isNotDisabled)
.filter(subscription -> {
var interestReason = subscription.getSpec().getReason();
if (hasSubject(interestReason)) {
return subjectMatch(subscription, reason.getSpec().getSubject());
} else if (StringUtils.isNotBlank(interestReason.getExpression())) {
return expressionMatch(subscription.getMetadata().getName(),
interestReason.getExpression(), reason);
}
return false;
})
.map(subscription -> {
var id = UserIdentity.of(subscription.getSpec().getSubscriber().getName());
return new Subscriber(id, subscription.getMetadata().getName());
})
.distinct(Subscriber::name);
}
boolean hasSubject(Subscription.InterestReason interestReason) {
return !Subscription.InterestReason.isFallbackSubject(interestReason.getSubject());
}
boolean expressionMatch(String subscriptionName, String expressionStr, Reason reason) {
try {
Expression expression =
expressionParser.parseExpression(expressionStr);
var result = expression.getValue(evaluationContext,
exprRootObject(reason),
Boolean.class);
return BooleanUtils.isTrue(result);
} catch (ParseException | EvaluationException e) {
log.debug("Failed to parse or evaluate expression for subscription [{}], skip it.",
subscriptionName, Throwables.getRootCause(e));
return false;
}
}
Map<String, Object> exprRootObject(Reason reason) {
var map = new HashMap<String, Object>(3, 1);
map.put("props", defaultIfNull(reason.getSpec().getAttributes(), new ReasonAttributes()));
map.put("subject", reason.getSpec().getSubject());
map.put("author", reason.getSpec().getAuthor());
return Collections.unmodifiableMap(map);
}
static boolean subjectMatch(Subscription subscription, Reason.Subject reasonSubject) {
Assert.notNull(subscription, "The subscription must not be null");
Assert.notNull(reasonSubject, "The reasonSubject must not be null");
final var sourceSubject = subscription.getSpec().getReason().getSubject();
var matchSubject = new Subscription.ReasonSubject();
matchSubject.setKind(reasonSubject.getKind());
matchSubject.setApiVersion(reasonSubject.getApiVersion());
if (StringUtils.isBlank(sourceSubject.getName())) {
return sourceSubject.equals(matchSubject);
}
matchSubject.setName(reasonSubject.getName());
return sourceSubject.equals(matchSubject);
}
boolean isNotDisabled(Subscription subscription) {
return !subscription.getSpec().isDisabled();
}
EvaluationContext createEvaluationContext() {
return SimpleEvaluationContext.forPropertyAccessors(
DataBindingPropertyAccessor.forReadOnlyAccess(),
new MapAccessor(),
new JsonPropertyAccessor()
)
.withConversionService(DefaultConversionService.getSharedInstance())
.build();
}
}

View File

@ -0,0 +1,28 @@
package run.halo.app.notification;
import java.util.Optional;
import org.springframework.util.Assert;
import run.halo.app.infra.AnonymousUserConst;
public record Subscriber(UserIdentity identity, String subscriptionName) {
public Subscriber {
Assert.notNull(identity, "The subscriber must not be null");
Assert.hasText(subscriptionName, "The subscription name must not be blank");
}
public String name() {
return identity.name();
}
public String username() {
return identity.isAnonymous() ? AnonymousUserConst.PRINCIPAL : identity.name();
}
public boolean isAnonymous() {
return identity.isAnonymous();
}
public Optional<String> getEmail() {
return identity.getEmail();
}
}

View File

@ -0,0 +1,155 @@
package run.halo.app.notification;
import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_PAGE;
import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_POST;
import static run.halo.app.content.NotificationReasonConst.SOMEONE_REPLIED_TO_YOU;
import static run.halo.app.extension.index.query.QueryFactory.and;
import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.in;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import static run.halo.app.extension.index.query.QueryFactory.startsWith;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.infra.AnonymousUserConst;
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
import run.halo.app.infra.ReactiveExtensionPaginatedOperatorImpl;
/**
* Subscription migration to adapt to the new expression subscribe mechanism.
*
* @author guqing
* @since 2.15.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SubscriptionMigration implements ApplicationListener<ApplicationStartedEvent> {
private final NotificationCenter notificationCenter;
private final ReactiveExtensionClient client;
private final SubscriptionService subscriptionService;
private final ReactiveExtensionPaginatedOperator paginatedOperator;
@Override
@Async
public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
handleAnonymousSubscription();
cleanupUserSubscription();
}
private void cleanupUserSubscription() {
var listOptions = new ListOptions();
var query = isNull("metadata.deletionTimestamp");
listOptions.setFieldSelector(FieldSelector.of(query));
var iterator =
new ReactiveExtensionPaginatedOperatorImpl(client);
iterator.list(User.class, listOptions)
.map(user -> user.getMetadata().getName())
.flatMap(this::removeInternalSubscriptionForUser)
.then()
.doOnSuccess(unused -> log.info("Cleanup user subscription completed"))
.block();
}
private void handleAnonymousSubscription() {
log.debug("Start to collating anonymous subscription...");
Set<String> anonymousSubscribers = new HashSet<>();
deleteAnonymousSubscription(subscription -> {
var name = subscription.getSpec().getSubscriber().getName();
anonymousSubscribers.add(name);
}).block();
if (anonymousSubscribers.isEmpty()) {
return;
}
// anonymous only subscribe some-one-replied-to-you reason
for (String anonymousSubscriber : anonymousSubscribers) {
createSubscription(anonymousSubscriber,
SOMEONE_REPLIED_TO_YOU,
"props.repliedOwner == '%s'".formatted(anonymousSubscriber)).block();
}
log.info("Collating anonymous subscription completed.");
}
private Mono<Void> deleteAnonymousSubscription(Consumer<Subscription> consumer) {
var listOptions = new ListOptions();
var query = and(startsWith("spec.subscriber", AnonymousUserConst.PRINCIPAL),
isNull("spec.reason.expression"),
isNull("metadata.deletionTimestamp"),
in("spec.reason.reasonType", Set.of(NEW_COMMENT_ON_POST,
NEW_COMMENT_ON_PAGE,
SOMEONE_REPLIED_TO_YOU))
);
listOptions.setFieldSelector(FieldSelector.of(query));
return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions)
.doOnNext(consumer)
.doOnNext(subscription -> log.debug("Deleted anonymous subscription: {}",
subscription.getMetadata().getName())
)
.then();
}
private Mono<Void> removeInternalSubscriptionForUser(String username) {
log.debug("Start to collating internal subscription for user: {}", username);
var subscriber = new Subscription.Subscriber();
subscriber.setName(username);
var listOptions = new ListOptions();
var fieldQuery = and(isNull("metadata.deletionTimestamp"),
equal("spec.subscriber", subscriber.toString()),
in("spec.reason.reasonType", Set.of(
NEW_COMMENT_ON_POST,
NEW_COMMENT_ON_PAGE,
SOMEONE_REPLIED_TO_YOU
))
);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return subscriptionService.removeBy(listOptions)
.map(subscription -> {
var name = subscription.getSpec().getSubscriber().getName();
var reason = subscription.getSpec().getReason();
String expression = switch (reason.getReasonType()) {
case NEW_COMMENT_ON_POST -> "props.postOwner == '%s'".formatted(name);
case NEW_COMMENT_ON_PAGE -> "props.pageOwner == '%s'".formatted(name);
case SOMEONE_REPLIED_TO_YOU -> "props.repliedOwner == '%s'".formatted(name);
// never happen
default -> null;
};
return new SubscriptionSummary(name, reason.getReasonType(), expression);
})
.distinct()
.flatMap(summary -> createSubscription(summary.subscriber(), summary.reasonType(),
summary.expression()))
.then()
.doOnSuccess(unused ->
log.debug("Collating internal subscription for user: {} completed", username));
}
Mono<Void> createSubscription(String name, String reasonType, String expression) {
var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(reasonType);
interestReason.setExpression(expression);
var subscriber = new Subscription.Subscriber();
subscriber.setName(name);
log.debug("Create subscription for user: {} with reasonType: {}", name, reasonType);
return notificationCenter.subscribe(subscriber, interestReason).then();
}
record SubscriptionSummary(String subscriber, String reasonType, String expression) {
}
}

View File

@ -0,0 +1,26 @@
package run.halo.app.notification;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.ListOptions;
public interface SubscriptionService {
/**
* <p>List subscriptions by page one by one.only consume one page then next page will be
* loaded.</p>
* <p>Note that: result can not be used to delete the subscription, it is only used to query the
* subscription.</p>
*/
Flux<Subscription> listByPerPage(String reasonType);
Mono<Void> remove(Subscription.Subscriber subscriber,
Subscription.InterestReason interestReasons);
Mono<Void> remove(Subscription.Subscriber subscriber);
Mono<Subscription> remove(Subscription subscription);
Flux<Subscription> removeBy(ListOptions listOptions);
}

View File

@ -0,0 +1,103 @@
package run.halo.app.notification;
import static run.halo.app.extension.index.query.QueryFactory.and;
import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import static run.halo.app.extension.index.query.QueryFactory.startsWith;
import java.time.Duration;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.index.query.Query;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
@Component
@RequiredArgsConstructor
public class SubscriptionServiceImpl implements SubscriptionService {
private final ReactiveExtensionClient client;
private final ReactiveExtensionPaginatedOperator paginatedOperator;
@Override
public Mono<Void> remove(Subscription.Subscriber subscriber,
Subscription.InterestReason interestReason) {
Assert.notNull(subscriber, "The subscriber must not be null");
Assert.notNull(interestReason, "The interest reason must not be null");
var reasonType = interestReason.getReasonType();
var expression = interestReason.getExpression();
var subject = interestReason.getSubject();
var listOptions = new ListOptions();
var fieldQuery = and(isNull("metadata.deletionTimestamp"),
equal("spec.subscriber", subscriber.toString()),
equal("spec.reason.reasonType", reasonType));
if (subject != null) {
fieldQuery = and(fieldQuery, reasonSubjectMatch(subject));
}
if (StringUtils.isNotBlank(expression)) {
fieldQuery = and(fieldQuery, equal("spec.reason.expression", expression));
}
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions).then();
}
@Override
public Mono<Void> remove(Subscription.Subscriber subscriber) {
var listOptions = new ListOptions();
var fieldQuery = and(isNull("metadata.deletionTimestamp"),
equal("spec.subscriber", subscriber.toString()));
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions)
.then();
}
@Override
public Mono<Subscription> remove(Subscription subscription) {
return client.delete(subscription)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete(subscription.getMetadata().getName()));
}
@Override
public Flux<Subscription> removeBy(ListOptions listOptions) {
return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions);
}
@Override
public Flux<Subscription> listByPerPage(String reasonType) {
final var listOptions = new ListOptions();
var fieldQuery = and(isNull("metadata.deletionTimestamp"),
equal("spec.reason.reasonType", reasonType));
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return paginatedOperator.list(Subscription.class, listOptions);
}
private Mono<Subscription> attemptToDelete(String subscriptionName) {
return Mono.defer(() -> client.fetch(Subscription.class, subscriptionName)
.flatMap(client::delete)
)
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}
Query reasonSubjectMatch(Subscription.ReasonSubject reasonSubject) {
Assert.notNull(reasonSubject, "The reasonSubject must not be null");
if (StringUtils.isNotBlank(reasonSubject.getName())) {
return equal("spec.reason.subject", reasonSubject.toString());
}
var matchAllSubject = new Subscription.ReasonSubject();
matchAllSubject.setKind(reasonSubject.getKind());
matchAllSubject.setApiVersion(reasonSubject.getApiVersion());
return startsWith("spec.reason.subject", matchAllSubject.toString());
}
}

View File

@ -79,6 +79,9 @@ spec:
- name: postName - name: postName
type: string type: string
description: "The name of the post." description: "The name of the post."
- name: postOwner
type: string
description: "The user name of the post owner."
- name: postTitle - name: postTitle
type: string type: string
- name: postUrl - name: postUrl
@ -107,6 +110,9 @@ spec:
- name: pageName - name: pageName
type: string type: string
description: "The name of the single page." description: "The name of the single page."
- name: pageOwner
type: string
description: "The user name of the page owner."
- name: pageTitle - name: pageTitle
type: string type: string
- name: pageUrl - name: pageUrl
@ -144,6 +150,12 @@ spec:
type: boolean type: boolean
- name: commentContent - name: commentContent
type: string type: string
- name: repliedOwner
type: string
description: "The owner of the comment or reply that has been replied to."
- name: replyOwner
type: string
description: "The user who created the current reply."
- name: replier - name: replier
type: string type: string
description: "The display name of the replier." description: "The display name of the replier."

View File

@ -454,21 +454,6 @@ class CommentNotificationReasonPublisherTest {
} }
} }
@Test
void identityFromTest() {
var owner = new Comment.CommentOwner();
owner.setKind(User.KIND);
owner.setName("fake-user");
assertThat(CommentNotificationReasonPublisher.identityFrom(owner))
.isEqualTo(UserIdentity.of(owner.getName()));
owner.setKind(Comment.CommentOwner.KIND_EMAIL);
owner.setName("example@example.com");
assertThat(CommentNotificationReasonPublisher.identityFrom(owner))
.isEqualTo(UserIdentity.anonymousWithEmail(owner.getName()));
}
static Comment createComment() { static Comment createComment() {
var comment = new Comment(); var comment = new Comment();
comment.setMetadata(new Metadata()); comment.setMetadata(new Metadata());

View File

@ -7,6 +7,7 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static run.halo.app.content.comment.ReplyNotificationSubscriptionHelper.identityFrom;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -24,9 +25,8 @@ import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.GroupVersionKind; import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.Metadata; import run.halo.app.extension.Metadata;
import run.halo.app.extension.Ref; import run.halo.app.extension.Ref;
import run.halo.app.infra.AnonymousUserConst;
import run.halo.app.notification.NotificationCenter; import run.halo.app.notification.NotificationCenter;
import run.halo.app.notification.SubscriberEmailResolver; import run.halo.app.notification.UserIdentity;
/** /**
* Tests for {@link ReplyNotificationSubscriptionHelper}. * Tests for {@link ReplyNotificationSubscriptionHelper}.
@ -40,9 +40,6 @@ class ReplyNotificationSubscriptionHelperTest {
@Mock @Mock
NotificationCenter notificationCenter; NotificationCenter notificationCenter;
@Mock
SubscriberEmailResolver subscriberEmailResolver;
@InjectMocks @InjectMocks
ReplyNotificationSubscriptionHelper notificationSubscriptionHelper; ReplyNotificationSubscriptionHelper notificationSubscriptionHelper;
@ -51,17 +48,12 @@ class ReplyNotificationSubscriptionHelperTest {
var comment = createComment(); var comment = createComment();
var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper); var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper);
doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(), any()); doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(UserIdentity.class));
spyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment); spyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment);
var reasonSubject = Subscription.ReasonSubject.builder() verify(spyNotificationSubscriptionHelper).subscribeReply(
.apiVersion(comment.getApiVersion()) eq(ReplyNotificationSubscriptionHelper.identityFrom(
.kind(comment.getKind())
.name(comment.getMetadata().getName())
.build();
verify(spyNotificationSubscriptionHelper).subscribeReply(eq(reasonSubject),
eq(ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(
comment.getSpec().getOwner())) comment.getSpec().getOwner()))
); );
} }
@ -80,17 +72,12 @@ class ReplyNotificationSubscriptionHelperTest {
var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper); var spyNotificationSubscriptionHelper = spy(notificationSubscriptionHelper);
doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(), any()); doNothing().when(spyNotificationSubscriptionHelper).subscribeReply(any(UserIdentity.class));
spyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply); spyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply);
var reasonSubject = Subscription.ReasonSubject.builder() verify(spyNotificationSubscriptionHelper).subscribeReply(
.apiVersion(reply.getApiVersion()) eq(ReplyNotificationSubscriptionHelper.identityFrom(
.kind(reply.getKind())
.name(reply.getMetadata().getName())
.build();
verify(spyNotificationSubscriptionHelper).subscribeReply(eq(reasonSubject),
eq(ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(
reply.getSpec().getOwner())) reply.getSpec().getOwner()))
); );
} }
@ -98,48 +85,38 @@ class ReplyNotificationSubscriptionHelperTest {
@Test @Test
void subscribeReplyTest() { void subscribeReplyTest() {
var comment = createComment(); var comment = createComment();
var reasonSubject = Subscription.ReasonSubject.builder() var identity = ReplyNotificationSubscriptionHelper.identityFrom(
.apiVersion(comment.getApiVersion())
.kind(comment.getKind())
.name(comment.getMetadata().getName())
.build();
var identity = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(
comment.getSpec().getOwner()); comment.getSpec().getOwner());
when(notificationCenter.subscribe(any(), any())).thenReturn(Mono.empty()); when(notificationCenter.subscribe(any(), any())).thenReturn(Mono.empty());
var subscriber = new Subscription.Subscriber(); var subscriber = new Subscription.Subscriber();
subscriber.setName(AnonymousUserConst.PRINCIPAL + "#" + identity.name()); subscriber.setName(identity.name());
when(subscriberEmailResolver.ofEmail(eq(identity.name())))
.thenReturn(subscriber);
notificationSubscriptionHelper.subscribeReply(reasonSubject, identity); notificationSubscriptionHelper.subscribeReply(identity);
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU); interestReason.setReasonType(NotificationReasonConst.SOMEONE_REPLIED_TO_YOU);
interestReason.setSubject(reasonSubject); interestReason.setExpression("props.repliedOwner == '%s'".formatted(subscriber.getName()));
verify(notificationCenter).subscribe(eq(subscriber), eq(interestReason)); verify(notificationCenter).subscribe(eq(subscriber), eq(interestReason));
verify(subscriberEmailResolver).ofEmail(eq(identity.name()));
} }
@Nested @Nested
class IdentityTest { class IdentityTest {
@Test @Test
void createForCommentOwner() { void identityFromTest() {
var commentOwner = new Comment.CommentOwner(); var owner = new Comment.CommentOwner();
commentOwner.setKind(Comment.CommentOwner.KIND_EMAIL); owner.setKind(User.KIND);
commentOwner.setName("example@example.com"); owner.setName("fake-user");
var sub = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(commentOwner); assertThat(identityFrom(owner))
assertThat(sub.isEmail()).isTrue(); .isEqualTo(UserIdentity.of(owner.getName()));
assertThat(sub.name()).isEqualTo(commentOwner.getName());
commentOwner.setKind(User.KIND); owner.setKind(Comment.CommentOwner.KIND_EMAIL);
commentOwner.setName("fake-user"); owner.setName("example@example.com");
sub = ReplyNotificationSubscriptionHelper.Identity.fromCommentOwner(commentOwner); assertThat(identityFrom(owner))
assertThat(sub.isEmail()).isFalse(); .isEqualTo(UserIdentity.anonymousWithEmail(owner.getName()));
assertThat(sub.name()).isEqualTo(commentOwner.getName());
} }
} }

View File

@ -149,7 +149,7 @@ class PostReconcilerTest {
when(client.fetch(eq(Post.class), eq(name))) when(client.fetch(eq(Post.class), eq(name)))
.thenReturn(Optional.of(post)); .thenReturn(Optional.of(post));
when(postService.getContent(eq(post.getSpec().getReleaseSnapshot()), when(postService.getContent(eq(post.getSpec().getReleaseSnapshot()),
eq(post.getSpec().getBaseSnapshot()))) eq(post.getSpec().getBaseSnapshot())))
.thenReturn(Mono.just(ContentWrapper.builder() .thenReturn(Mono.just(ContentWrapper.builder()
.snapshotName(post.getSpec().getHeadSnapshot()) .snapshotName(post.getSpec().getHeadSnapshot())
.raw("hello world") .raw("hello world")
@ -215,11 +215,7 @@ class PostReconcilerTest {
assertArg(argReason -> { assertArg(argReason -> {
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_POST);
interestReason.setSubject(Subscription.ReasonSubject.builder() interestReason.setExpression("props.postOwner == 'null'");
.apiVersion(post.getApiVersion())
.kind(post.getKind())
.name(post.getMetadata().getName())
.build());
assertThat(argReason).isEqualTo(interestReason); assertThat(argReason).isEqualTo(interestReason);
})); }));
} }

View File

@ -232,11 +232,7 @@ class SinglePageReconcilerTest {
assertArg(argReason -> { assertArg(argReason -> {
var interestReason = new Subscription.InterestReason(); var interestReason = new Subscription.InterestReason();
interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE); interestReason.setReasonType(NotificationReasonConst.NEW_COMMENT_ON_PAGE);
interestReason.setSubject(Subscription.ReasonSubject.builder() interestReason.setExpression("props.pageOwner == 'null'");
.apiVersion(page.getApiVersion())
.kind(page.getKind())
.name(page.getMetadata().getName())
.build());
assertThat(argReason).isEqualTo(interestReason); assertThat(argReason).isEqualTo(interestReason);
})); }));
} }

View File

@ -22,6 +22,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.Role; import run.halo.app.core.extension.Role;
import run.halo.app.core.extension.RoleBinding; import run.halo.app.core.extension.RoleBinding;
import run.halo.app.core.extension.User; import run.halo.app.core.extension.User;
@ -31,6 +32,7 @@ import run.halo.app.extension.Metadata;
import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.Reconciler;
import run.halo.app.infra.AnonymousUserConst; import run.halo.app.infra.AnonymousUserConst;
import run.halo.app.infra.ExternalUrlSupplier; import run.halo.app.infra.ExternalUrlSupplier;
import run.halo.app.notification.NotificationCenter;
/** /**
* Tests for {@link UserReconciler}. * Tests for {@link UserReconciler}.
@ -46,6 +48,9 @@ class UserReconcilerTest {
@Mock @Mock
private ExtensionClient client; private ExtensionClient client;
@Mock
private NotificationCenter notificationCenter;
@Mock @Mock
private RoleService roleService; private RoleService roleService;
@ -54,6 +59,7 @@ class UserReconcilerTest {
@BeforeEach @BeforeEach
void setUp() { void setUp() {
lenient().when(notificationCenter.unsubscribe(any(), any())).thenReturn(Mono.empty());
lenient().when(roleService.listRoleRefs(any())).thenReturn(Flux.empty()); lenient().when(roleService.listRoleRefs(any())).thenReturn(Flux.empty());
} }

View File

@ -0,0 +1,107 @@
package run.halo.app.infra;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Sort;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.extension.FakeExtension;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.ReactiveExtensionClient;
@ExtendWith(MockitoExtension.class)
class ReactiveExtensionPaginatedOperatorImplTest {
@Mock
private ReactiveExtensionClient client;
@InjectMocks
private ReactiveExtensionPaginatedOperatorImpl service;
@Nested
class ListTest {
@BeforeEach
void setUp() {
Instant now = Instant.now();
var items = new ArrayList<>();
// Generate 900 items
for (int j = 0; j < 9; j++) {
items.addAll(generateItems(100, now));
}
// mock new items during the process
Instant otherNow = now.plusSeconds(1000);
items.addAll(generateItems(90, otherNow));
when(client.listBy(any(), any(), any())).thenAnswer(invocation -> {
PageRequest pageRequest = invocation.getArgument(2);
int pageNumber = pageRequest.getPageNumber();
var list = ListResult.subList(items, pageNumber, pageRequest.getPageSize());
var result = new ListResult<>(pageNumber, pageRequest.getPageSize(),
items.size(), list);
return Mono.just(result);
});
}
@Test
public void listTest() {
StepVerifier.create(service.list(FakeExtension.class, new ListOptions()))
.expectNextCount(900)
.verifyComplete();
}
}
@Test
void nextPageTest() {
var result = new ListResult<FakeExtension>(1, 10, 30, List.of());
var sort = Sort.by("metadata.creationTimestamp");
var nextPage = ReactiveExtensionPaginatedOperatorImpl.nextPage(result, sort);
assertThat(nextPage.getPageNumber()).isEqualTo(2);
assertThat(nextPage.getPageSize()).isEqualTo(10);
assertThat(nextPage.getSort()).isEqualTo(sort);
}
@Test
void shouldTakeNextTest() {
var now = Instant.now();
var item = new FakeExtension();
item.setMetadata(new Metadata());
item.getMetadata().setCreationTimestamp(now);
var result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now);
assertThat(result).isTrue();
item.getMetadata().setCreationTimestamp(now.minusSeconds(1));
result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now);
assertThat(result).isTrue();
item.getMetadata().setCreationTimestamp(now.plusSeconds(1));
result = ReactiveExtensionPaginatedOperatorImpl.shouldTakeNext(item, now);
assertThat(result).isFalse();
}
private List<FakeExtension> generateItems(int count, Instant creationTimestamp) {
List<FakeExtension> items = new ArrayList<>();
for (int i = 0; i < count; i++) {
var item = new FakeExtension();
item.setMetadata(new Metadata());
item.getMetadata().setCreationTimestamp(creationTimestamp);
items.add(item);
}
return items;
}
}

View File

@ -13,7 +13,6 @@ import static org.mockito.Mockito.when;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
@ -29,12 +28,8 @@ import run.halo.app.core.extension.notification.NotifierDescriptor;
import run.halo.app.core.extension.notification.Reason; import run.halo.app.core.extension.notification.Reason;
import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.ReasonType;
import run.halo.app.core.extension.notification.Subscription; import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.GroupVersion;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.Metadata; import run.halo.app.extension.Metadata;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.infra.utils.JsonUtils;
/** /**
* Tests for {@link DefaultNotificationCenter}. * Tests for {@link DefaultNotificationCenter}.
@ -60,6 +55,12 @@ class DefaultNotificationCenterTest {
@Mock @Mock
private NotificationSender notificationSender; private NotificationSender notificationSender;
@Mock
private RecipientResolver recipientResolver;
@Mock
private SubscriptionService subscriptionService;
@InjectMocks @InjectMocks
private DefaultNotificationCenter notificationCenter; private DefaultNotificationCenter notificationCenter;
@ -78,21 +79,17 @@ class DefaultNotificationCenterTest {
reason.setMetadata(new Metadata()); reason.setMetadata(new Metadata());
reason.getMetadata().setName("reason-a"); reason.getMetadata().setName("reason-a");
var subscriptionReasonSubject = createNewReplyOnCommentSubject();
var spyNotificationCenter = spy(notificationCenter); var spyNotificationCenter = spy(notificationCenter);
var subscriptions = createSubscriptions(); var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name");
doReturn(Flux.fromIterable(subscriptions)) when(recipientResolver.resolve(reason)).thenReturn(Flux.just(subscriber));
.when(spyNotificationCenter).listObservers(eq("new-reply-on-comment"),
eq(subscriptionReasonSubject));
doReturn(Mono.empty()).when(spyNotificationCenter) doReturn(Mono.empty()).when(spyNotificationCenter)
.dispatchNotification(eq(reason), any()); .dispatchNotification(eq(reason), any());
spyNotificationCenter.notify(reason).block(); spyNotificationCenter.notify(reason).block();
verify(spyNotificationCenter).dispatchNotification(eq(reason), any()); verify(spyNotificationCenter).dispatchNotification(eq(reason), any());
verify(spyNotificationCenter).listObservers(eq("new-reply-on-comment"), verify(recipientResolver).resolve(eq(reason));
eq(subscriptionReasonSubject));
} }
List<Subscription> createSubscriptions() { List<Subscription> createSubscriptions() {
@ -129,69 +126,16 @@ class DefaultNotificationCenterTest {
var reason = subscription.getSpec().getReason(); var reason = subscription.getSpec().getReason();
doReturn(Flux.just(subscription)) doReturn(Mono.empty())
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason)); .when(spyNotificationCenter).unsubscribe(eq(subscriber), eq(reason));
when(client.create(any(Subscription.class))).thenReturn(Mono.empty()); when(client.create(any(Subscription.class))).thenReturn(Mono.empty());
spyNotificationCenter.subscribe(subscriber, reason).block(); spyNotificationCenter.subscribe(subscriber, reason).block();
verify(client, times(0)).create(any(Subscription.class));
// not exists subscription will create a new subscription
var newReason = JsonUtils.deepCopy(reason);
newReason.setReasonType("fake-reason-type");
doReturn(Flux.empty())
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason));
spyNotificationCenter.subscribe(subscriber, newReason).block();
verify(client).create(any(Subscription.class)); verify(client).create(any(Subscription.class));
} }
@Test
public void testUnsubscribe() {
Subscription.Subscriber subscriber = new Subscription.Subscriber();
subscriber.setName("anonymousUser#A");
var spyNotificationCenter = spy(notificationCenter);
var subscriptions = createSubscriptions();
doReturn(Mono.just(new ListResult<>(subscriptions)))
.when(spyNotificationCenter).pageSubscriptionBy(eq(subscriber), any(PageRequest.class));
when(client.delete(any(Subscription.class))).thenReturn(Mono.empty());
spyNotificationCenter.unsubscribe(subscriber).block();
verify(client).delete(any(Subscription.class));
}
@Test
public void testUnsubscribeWithReason() {
var spyNotificationCenter = spy(notificationCenter);
var subscriptions = createSubscriptions();
var subscription = subscriptions.get(0);
var subscriber = subscription.getSpec().getSubscriber();
var reason = subscription.getSpec().getReason();
var newReason = JsonUtils.deepCopy(reason);
newReason.setReasonType("fake-reason-type");
doReturn(Flux.empty())
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason));
when(client.delete(any(Subscription.class))).thenReturn(Mono.empty());
spyNotificationCenter.unsubscribe(subscriber, newReason).block();
verify(client, times(0)).delete(any(Subscription.class));
doReturn(Flux.just(subscription))
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason));
// exists subscription will be deleted
spyNotificationCenter.unsubscribe(subscriber, reason).block();
verify(client).delete(any(Subscription.class));
}
@Test @Test
public void testGetNotifiersBySubscriber() { public void testGetNotifiersBySubscriber() {
UserNotificationPreference preference = new UserNotificationPreference(); UserNotificationPreference preference = new UserNotificationPreference();
@ -203,8 +147,7 @@ class DefaultNotificationCenterTest {
reason.getMetadata().setName("reason-a"); reason.getMetadata().setName("reason-a");
reason.setSpec(new Reason.Spec()); reason.setSpec(new Reason.Spec());
reason.getSpec().setReasonType("new-reply-on-comment"); reason.getSpec().setReasonType("new-reply-on-comment");
var subscriber = new Subscription.Subscriber(); var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name");
subscriber.setName("anonymousUser#A");
notificationCenter.getNotifiersBySubscriber(subscriber, reason) notificationCenter.getNotifiersBySubscriber(subscriber, reason)
.collectList() .collectList()
@ -215,7 +158,7 @@ class DefaultNotificationCenterTest {
}) })
.verifyComplete(); .verifyComplete();
verify(userNotificationPreferenceService).getByUser(eq(subscriber.getName())); verify(userNotificationPreferenceService).getByUser(eq(subscriber.name()));
} }
@Test @Test
@ -234,7 +177,6 @@ class DefaultNotificationCenterTest {
.when(spyNotificationCenter).prepareNotificationElement(any(), any(), any()); .when(spyNotificationCenter).prepareNotificationElement(any(), any(), any());
doReturn(Mono.empty()).when(spyNotificationCenter).sendNotification(any()); doReturn(Mono.empty()).when(spyNotificationCenter).sendNotification(any());
doReturn(Mono.empty()).when(spyNotificationCenter).createNotification(any());
var reason = new Reason(); var reason = new Reason();
reason.setMetadata(new Metadata()); reason.setMetadata(new Metadata());
@ -243,11 +185,15 @@ class DefaultNotificationCenterTest {
reason.getSpec().setReasonType("new-reply-on-comment"); reason.getSpec().setReasonType("new-reply-on-comment");
var subscription = createSubscriptions().get(0); var subscription = createSubscriptions().get(0);
spyNotificationCenter.dispatchNotification(reason, subscription).block(); var subscriptionName = subscription.getMetadata().getName();
var subscriber =
new Subscriber(UserIdentity.of(subscription.getSpec().getSubscriber().getName()),
subscriptionName);
spyNotificationCenter.dispatchNotification(reason, subscriber).block();
verify(client).fetch(eq(NotifierDescriptor.class), eq("email-notifier")); verify(client).fetch(eq(NotifierDescriptor.class), eq("email-notifier"));
verify(spyNotificationCenter).sendNotification(any()); verify(spyNotificationCenter).sendNotification(any());
verify(spyNotificationCenter).createNotification(any()); verify(spyNotificationCenter, times(0)).createNotification(any());
} }
@Test @Test
@ -282,7 +228,7 @@ class DefaultNotificationCenterTest {
var element = mock(DefaultNotificationCenter.NotificationElement.class); var element = mock(DefaultNotificationCenter.NotificationElement.class);
var mockDescriptor = mock(NotifierDescriptor.class); var mockDescriptor = mock(NotifierDescriptor.class);
when(element.descriptor()).thenReturn(mockDescriptor); when(element.descriptor()).thenReturn(mockDescriptor);
when(element.subscription()).thenReturn(mock(Subscription.class)); when(element.subscriber()).thenReturn(mock(Subscriber.class));
var notifierDescriptorSpec = mock(NotifierDescriptor.Spec.class); var notifierDescriptorSpec = mock(NotifierDescriptor.Spec.class);
when(mockDescriptor.getSpec()).thenReturn(notifierDescriptorSpec); when(mockDescriptor.getSpec()).thenReturn(notifierDescriptorSpec);
when(notifierDescriptorSpec.getNotifierExtName()).thenReturn("fake-notifier-ext"); when(notifierDescriptorSpec.getNotifierExtName()).thenReturn("fake-notifier-ext");
@ -299,9 +245,12 @@ class DefaultNotificationCenterTest {
var subscription = createSubscriptions().get(0); var subscription = createSubscriptions().get(0);
var user = mock(User.class); var user = mock(User.class);
var subscriberName = subscription.getSpec().getSubscriber().getName(); var subscriptionName = subscription.getMetadata().getName();
when(client.fetch(eq(User.class), eq(subscriberName))).thenReturn(Mono.just(user)); var subscriber =
when(element.subscription()).thenReturn(subscription); new Subscriber(UserIdentity.of(subscription.getSpec().getSubscriber().getName()),
subscriptionName);
when(client.fetch(eq(User.class), eq(subscriber.name()))).thenReturn(Mono.just(user));
when(element.subscriber()).thenReturn(subscriber);
when(client.create(any(Notification.class))).thenReturn(Mono.empty()); when(client.create(any(Notification.class))).thenReturn(Mono.empty());
@ -314,7 +263,7 @@ class DefaultNotificationCenterTest {
notificationCenter.createNotification(element).block(); notificationCenter.createNotification(element).block();
verify(client).fetch(eq(User.class), eq(subscriberName)); verify(client).fetch(eq(User.class), eq(subscriber.name()));
verify(client).create(any(Notification.class)); verify(client).create(any(Notification.class));
} }
@ -334,8 +283,8 @@ class DefaultNotificationCenterTest {
doReturn(Mono.just(reasonType)) doReturn(Mono.just(reasonType))
.when(spyNotificationCenter).getReasonType(eq(reasonTypeName)); .when(spyNotificationCenter).getReasonType(eq(reasonTypeName));
doReturn("fake-url") doReturn(Mono.just("fake-unsubscribe-url"))
.when(spyNotificationCenter).getUnsubscribeUrl(any()); .when(spyNotificationCenter).getUnsubscribeUrl(anyString());
final var locale = Locale.CHINESE; final var locale = Locale.CHINESE;
@ -356,98 +305,17 @@ class DefaultNotificationCenterTest {
when(notificationTemplateSelector.select(eq(reasonTypeName), any())) when(notificationTemplateSelector.select(eq(reasonTypeName), any()))
.thenReturn(Mono.just(template)); .thenReturn(Mono.just(template));
var subscription = new Subscription(); var subscriber = new Subscriber(UserIdentity.anonymousWithEmail("A"), "fake-name");
subscription.setSpec(new Subscription.Spec());
var subscriber = new Subscription.Subscriber();
subscriber.setName("anonymousUser#A");
subscription.getSpec().setSubscriber(subscriber);
spyNotificationCenter.inferenceTemplate(reason, subscription, locale).block(); spyNotificationCenter.inferenceTemplate(reason, subscriber, locale).block();
verify(spyNotificationCenter).getReasonType(eq(reasonTypeName)); verify(spyNotificationCenter).getReasonType(eq(reasonTypeName));
verify(notificationTemplateSelector).select(eq(reasonTypeName), any()); verify(notificationTemplateSelector).select(eq(reasonTypeName), any());
} }
@Test
void listObserverWhenDuplicateSubscribers() {
var sourceSubscriptions = createSubscriptions();
var subscriptionA = sourceSubscriptions.get(0);
var subscriptionB = JsonUtils.deepCopy(subscriptionA);
var subscriptionC = JsonUtils.deepCopy(subscriptionA);
subscriptionC.getSpec().getReason().getSubject().setName(null);
var subscriptions = Flux.just(subscriptionA, subscriptionB, subscriptionC);
DefaultNotificationCenter.distinctByKey(subscriptions)
.as(StepVerifier::create)
.expectNext(subscriptionA)
.verifyComplete();
}
@Nested
class SubscriptionDistinctKeyPredicateTest {
@Test
void differentSubjectName() {
var subscriptionA = createSubscriptions().get(0);
var subscriptionB = JsonUtils.deepCopy(subscriptionA);
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
subscriptionB.getSpec().getReason().getSubject().setName("other");
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isFalse();
subscriptionB.getSpec().getReason().getSubject().setName(null);
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
}
@Test
void differentSubjectApiVersion() {
var subscriptionA = createSubscriptions().get(0);
var subscriptionB = JsonUtils.deepCopy(subscriptionA);
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
var subjectA = subscriptionA.getSpec().getReason().getSubject();
var gvForA = GroupVersion.parseAPIVersion(subjectA.getApiVersion());
subscriptionB.getSpec().getReason().getSubject()
.setApiVersion(gvForA.group() + "/otherVersion");
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
}
@Test
void differentReasonType() {
var subscriptionA = createSubscriptions().get(0);
var subscriptionB = JsonUtils.deepCopy(subscriptionA);
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
subscriptionB.getSpec().getReason().setReasonType("other");
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isFalse();
}
@Test
void differentSubscriber() {
var subscriptionA = createSubscriptions().get(0);
var subscriptionB = JsonUtils.deepCopy(subscriptionA);
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isTrue();
subscriptionB.getSpec().getSubscriber().setName("other");
assertThat(DefaultNotificationCenter.subscriptionDistinctKeyPredicate()
.test(subscriptionA, subscriptionB)).isFalse();
}
}
@Test @Test
void getLocaleFromSubscriberTest() { void getLocaleFromSubscriberTest() {
var subscription = mock(Subscription.class); var subscription = mock(Subscriber.class);
notificationCenter.getLocaleFromSubscriber(subscription) notificationCenter.getLocaleFromSubscriber(subscription)
.as(StepVerifier::create) .as(StepVerifier::create)

View File

@ -0,0 +1,180 @@
package run.halo.app.notification;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.notification.Reason;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.Metadata;
/**
* Tests for {@link RecipientResolverImpl}.
*
* @author guqing
* @since 2.15.0
*/
@ExtendWith(MockitoExtension.class)
class RecipientResolverImplTest {
@Mock
private SubscriptionService subscriptionService;
@InjectMocks
private RecipientResolverImpl recipientResolver;
@Test
void testExpressionMatch() {
var subscriber1 = new Subscription.Subscriber();
subscriber1.setName("test");
final var subscription1 = createSubscription(subscriber1);
subscription1.getMetadata().setName("test-subscription");
subscription1.getSpec().getReason().setSubject(null);
subscription1.getSpec().getReason().setExpression("props.owner == 'test'");
var subscriber2 = new Subscription.Subscriber();
subscriber2.setName("guqing");
final var subscription2 = createSubscription(subscriber2);
subscription2.getMetadata().setName("guqing-subscription");
subscription2.getSpec().getReason().setSubject(null);
subscription2.getSpec().getReason().setExpression("props.owner == 'guqing'");
var reason = new Reason();
reason.setSpec(new Reason.Spec());
reason.getSpec().setReasonType("new-comment-on-post");
reason.getSpec().setSubject(new Reason.Subject());
reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1");
reason.getSpec().getSubject().setKind("Post");
reason.getSpec().getSubject().setName("fake-post");
var reasonAttributes = new ReasonAttributes();
reasonAttributes.put("owner", "guqing");
reason.getSpec().setAttributes(reasonAttributes);
when(subscriptionService.listByPerPage(anyString()))
.thenReturn(Flux.just(subscription1, subscription2));
recipientResolver.resolve(reason)
.as(StepVerifier::create)
.expectNext(new Subscriber(UserIdentity.of("guqing"), "guqing-subscription"))
.verifyComplete();
verify(subscriptionService).listByPerPage(anyString());
}
@Test
void testSubjectMatch() {
var subscriber = new Subscription.Subscriber();
subscriber.setName("test");
Subscription subscription = createSubscription(subscriber);
when(subscriptionService.listByPerPage(anyString()))
.thenReturn(Flux.just(subscription));
var reason = new Reason();
reason.setSpec(new Reason.Spec());
reason.getSpec().setReasonType("new-comment-on-post");
reason.getSpec().setSubject(new Reason.Subject());
reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1");
reason.getSpec().getSubject().setKind("Post");
reason.getSpec().getSubject().setName("fake-post");
recipientResolver.resolve(reason)
.as(StepVerifier::create)
.expectNext(new Subscriber(UserIdentity.of("test"), "fake-subscription"))
.verifyComplete();
verify(subscriptionService).listByPerPage(anyString());
}
@Test
void distinct() {
// same subscriber to different subscriptions
var subscriber = new Subscription.Subscriber();
subscriber.setName("test");
final var subscription1 = createSubscription(subscriber);
subscription1.getMetadata().setName("sub-1");
final var subscription2 = createSubscription(subscriber);
subscription2.getMetadata().setName("sub-2");
subscription2.getSpec().getReason().setSubject(null);
subscription2.getSpec().getReason().setExpression("props.owner == 'guqing'");
when(subscriptionService.listByPerPage(anyString()))
.thenReturn(Flux.just(subscription1, subscription2));
var reason = new Reason();
reason.setSpec(new Reason.Spec());
reason.getSpec().setReasonType("new-comment-on-post");
reason.getSpec().setSubject(new Reason.Subject());
reason.getSpec().getSubject().setApiVersion("content.halo.run/v1alpha1");
reason.getSpec().getSubject().setKind("Post");
reason.getSpec().getSubject().setName("fake-post");
var reasonAttributes = new ReasonAttributes();
reasonAttributes.put("owner", "guqing");
reason.getSpec().setAttributes(reasonAttributes);
recipientResolver.resolve(reason)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
verify(subscriptionService).listByPerPage(anyString());
}
@Test
void subjectMatchTest() {
var subscriber = new Subscription.Subscriber();
subscriber.setName("test");
final var subscription = createSubscription(subscriber);
// match all name subscription
var subject = new Reason.Subject();
subject.setApiVersion("content.halo.run/v1alpha1");
subject.setKind("Post");
subject.setName("fake-post");
assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isTrue();
// different kind
subject = new Reason.Subject();
subject.setApiVersion("content.halo.run/v1alpha1");
subject.setKind("SinglePage");
subject.setName("fake-post");
assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isFalse();
// special case
subscription.getSpec().getReason().getSubject().setName("other-post");
subject = new Reason.Subject();
subject.setApiVersion("content.halo.run/v1alpha1");
subject.setKind("Post");
subject.setName("fake-post");
assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isFalse();
subject.setName("other-post");
assertThat(RecipientResolverImpl.subjectMatch(subscription, subject)).isTrue();
}
private static Subscription createSubscription(Subscription.Subscriber subscriber) {
Subscription subscription = new Subscription();
subscription.setMetadata(new Metadata());
subscription.getMetadata().setName("fake-subscription");
subscription.setSpec(new Subscription.Spec());
subscription.getSpec().setSubscriber(subscriber);
var interestReason = new Subscription.InterestReason();
interestReason.setReasonType("new-comment-on-post");
interestReason.setSubject(new Subscription.ReasonSubject());
interestReason.getSubject().setApiVersion("content.halo.run/v1alpha1");
interestReason.getSubject().setKind("Post");
subscription.getSpec().setReason(interestReason);
return subscription;
}
}

View File

@ -0,0 +1,74 @@
package run.halo.app.notification;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.OptimisticLockingFailureException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.ReactiveExtensionClient;
/**
* Tests for {@link SubscriptionServiceImpl}.
*
* @author guqing
* @since 2.15.0
*/
@ExtendWith(MockitoExtension.class)
class SubscriptionServiceImplTest {
@Mock
private ReactiveExtensionClient client;
@InjectMocks
private SubscriptionServiceImpl subscriptionService;
@Test
void remove() {
var i = new AtomicLong(1L);
when(client.delete(any(Subscription.class))).thenAnswer(invocation -> {
var subscription = (Subscription) invocation.getArgument(0);
if (i.get() != subscription.getMetadata().getVersion()) {
return Mono.error(new OptimisticLockingFailureException("fake-exception"));
}
return Mono.just(subscription);
});
var subscription = new Subscription();
subscription.setMetadata(new Metadata());
subscription.getMetadata().setName("fake-subscription");
subscription.getMetadata().setVersion(0L);
when(client.fetch(eq(Subscription.class), eq("fake-subscription")))
.thenAnswer(invocation -> {
if (i.incrementAndGet() > 3) {
subscription.getMetadata().setVersion(i.get());
} else {
subscription.getMetadata().setVersion(i.get() - 1);
}
return Mono.just(subscription);
});
subscriptionService.remove(subscription)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
// give version=0, but the real version is 1
// give version=1, but the real version is 2
// give version=2, but the real version is 3
// give version=3, but the real version is 3 (delete success)
verify(client, times(3)).fetch(eq(Subscription.class), eq("fake-subscription"));
}
}

View File

@ -0,0 +1,171 @@
package run.halo.app.notification;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ExtensionStoreUtil;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.SchemeManager;
import run.halo.app.extension.index.IndexerFactory;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.extension.store.ReactiveExtensionStoreClient;
import run.halo.app.infra.utils.JsonUtils;
/**
* Integration tests for {@link SubscriptionService}.
*
* @author guqing
* @since 2.15.0
*/
@DirtiesContext
@SpringBootTest
class SubscriptionServiceIntegrationTest {
@Autowired
private SchemeManager schemeManager;
@SpyBean
private ReactiveExtensionClient client;
@Autowired
private ReactiveExtensionStoreClient storeClient;
@Autowired
private IndexerFactory indexerFactory;
Mono<Extension> deleteImmediately(Extension extension) {
var name = extension.getMetadata().getName();
var scheme = schemeManager.get(extension.getClass());
// un-index
var indexer = indexerFactory.getIndexer(extension.groupVersionKind());
indexer.unIndexRecord(extension.getMetadata().getName());
// delete from db
var storeName = ExtensionStoreUtil.buildStoreName(scheme, name);
return storeClient.delete(storeName, extension.getMetadata().getVersion())
.thenReturn(extension);
}
@Nested
class RemoveInitialBatchTest {
static int size = 310;
private final List<Subscription> storedSubscriptions = subscriptionsForStore();
@Autowired
private SubscriptionService subscriptionService;
@BeforeEach
void setUp() {
Flux.fromIterable(storedSubscriptions)
.flatMap(comment -> client.create(comment))
.as(StepVerifier::create)
.expectNextCount(storedSubscriptions.size())
.verifyComplete();
}
@AfterEach
void tearDown() {
Flux.fromIterable(storedSubscriptions)
.flatMap(SubscriptionServiceIntegrationTest.this::deleteImmediately)
.as(StepVerifier::create)
.expectNextCount(storedSubscriptions.size())
.verifyComplete();
}
private List<Subscription> subscriptionsForStore() {
List<Subscription> subscriptions = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
var subscription = createSubscription();
subscription.getMetadata().setName("subscription-" + i);
subscriptions.add(subscription);
}
return subscriptions;
}
@Test
void removeTest() {
var subscriber = new Subscription.Subscriber();
subscriber.setName("admin");
var interestReason = new Subscription.InterestReason();
interestReason.setReasonType("new-comment-on-post");
var subject = new Subscription.ReasonSubject();
subject.setApiVersion("content.halo.run/v1alpha1");
subject.setKind("Post");
interestReason.setSubject(subject);
subscriptionService.remove(subscriber, interestReason).block();
verify(client, atLeast(size)).delete(any(Subscription.class));
assertCleanedUp();
}
@Test
void removeBySubscriberTest() {
var subscriber = new Subscription.Subscriber();
subscriber.setName("admin");
subscriptionService.remove(subscriber).block();
verify(client, atLeast(size)).delete(any(Subscription.class));
assertCleanedUp();
}
private void assertCleanedUp() {
var listOptions = new ListOptions();
listOptions.setFieldSelector(FieldSelector.of(isNull("metadata.deletionTimestamp")));
client.listBy(Subscription.class, listOptions, PageRequestImpl.ofSize(1))
.as(StepVerifier::create)
.consumeNextWith(result -> {
assertThat(result.getTotal()).isEqualTo(0);
assertThat(result.getItems()).isEmpty();
})
.verifyComplete();
}
}
Subscription createSubscription() {
return JsonUtils.jsonToObject("""
{
"spec": {
"subscriber": {
"name": "admin"
},
"unsubscribeToken": "423530c9-bec7-446e-b73b-dd98ac00ba2b",
"reason": {
"reasonType": "new-comment-on-post",
"subject": {
"name": "5152aea5-c2e8-4717-8bba-2263d46e19d5",
"apiVersion": "content.halo.run/v1alpha1",
"kind": "Post"
}
},
"disabled": false
},
"apiVersion": "notification.halo.run/v1alpha1",
"kind": "Subscription",
"metadata": {
"generateName": "subscription-"
}
}
""", Subscription.class);
}
}

View File

@ -19,7 +19,8 @@
设计一个通知功能,可以根据以下目标,实现订阅和推送通知: 设计一个通知功能,可以根据以下目标,实现订阅和推送通知:
- 支持扩展多种通知方式例如邮件、短信、Slack 等。 - 支持扩展多种通知方式例如邮件、短信、Slack 等。
- 支持通知条件并可扩展,例如 Halo 有新文章发布事件如果用户订阅了新文章发布事件但付费订阅插件决定了此文章只有付费用户才可收到通知、按照付费等级不同决定是否发送新文章通知给对应用户等需要通过实现通知条件的扩展点来满足对应需求。 - 支持通知条件并可扩展,例如 Halo
有新文章发布事件如果用户订阅了新文章发布事件但付费订阅插件决定了此文章只有付费用户才可收到通知、按照付费等级不同决定是否发送新文章通知给对应用户等需要通过实现通知条件的扩展点来满足对应需求。
- 支持定制化选项,例如是否开启通知、通知时段等。 - 支持定制化选项,例如是否开启通知、通知时段等。
- 支持通知流程,例如通知的发送、接收、查看、标记等。 - 支持通知流程,例如通知的发送、接收、查看、标记等。
- 通知内容支持多语言。 - 通知内容支持多语言。
@ -97,7 +98,8 @@ spec:
#### Subscription #### Subscription
`Subscription` 自定义模型,定义了特定事件时与要被通知的订阅者之间的关系, 其中 `subscriber` 表示订阅者用户, `unsubscribeToken` 表示退订时的身份验证 token, `reason` 订阅者感兴趣的事件。 `Subscription` 自定义模型,定义了特定事件时与要被通知的订阅者之间的关系, 其中 `subscriber`
表示订阅者用户, `unsubscribeToken` 表示退订时的身份验证 token, `reason` 订阅者感兴趣的事件。
用户可以通过 `Subscription` 来订阅自己感兴趣的事件,当事件触发时会收到通知: 用户可以通过 `Subscription` 来订阅自己感兴趣的事件,当事件触发时会收到通知:
@ -116,13 +118,24 @@ spec:
apiVersion: content.halo.run/v1alpha1 apiVersion: content.halo.run/v1alpha1
kind: Post kind: Post
name: 'post-axgu' name: 'post-axgu'
# expression: 'props.owner == "guqing"'
``` ```
订阅退订链接 API 规则:`/apis/api.notification.halo.run/v1alpha1/subscriptions/{name}/unsubscribe?token={unsubscribeToken}`。 - `spec.reason.subject`:用于根据事件的主体的匹配感兴趣的事件,如果不指定 name 则表示匹配主体与 kind 和 apiVersion
相同的一类事件。
- `spec.expression`:根据表达式匹配感兴趣的事件,例如 `props.owner == "guqing"` 表示只有当事件的属性reason attributes
owner 等于 guqing 时才会触发通知。表达式符合 SpEL
表达式语法,但结果只能是布尔值。参考:[增强 Subscription 模型以支持表达式匹配](https://github.com/halo-dev/halo/issues/5632)
> 当 `spec.expression``spec.reason.subject` 同时存在时,以 `spec.reason.subject` 的结果为准,不建议同时使用。
订阅退订链接 API
规则:`/apis/api.notification.halo.run/v1alpha1/subscriptions/{name}/unsubscribe?token={unsubscribeToken}`。
#### 用户通知偏好设置 #### 用户通知偏好设置
通过在用户偏好设置的 ConfigMap 中存储一个 `notification` key 用于保存事件类型与通知方式的关系设置,当用户订阅了如 'new-comment-on-post' 事件时会获取对应的通知方式来给用户发送通知。 通过在用户偏好设置的 ConfigMap 中存储一个 `notification` key 用于保存事件类型与通知方式的关系设置,当用户订阅了如 '
new-comment-on-post' 事件时会获取对应的通知方式来给用户发送通知。
```yaml ```yaml
apiVersion: v1alpha1 apiVersion: v1alpha1
@ -153,7 +166,8 @@ data:
#### Notification 站内通知 #### Notification 站内通知
当用户订阅到事件后会创建 `Notification`, 它与通知方式notifier无关`recipient` 为用户名,类似站内通知,如用户 `guqing` 订阅了评论事件那么当监听到评论事件时会创建一条记录可以在个人中心的通知列表看到一条通知消息。 当用户订阅到事件后会创建 `Notification`, 它与通知方式notifier无关`recipient` 为用户名,类似站内通知,如用户 `guqing`
订阅了评论事件那么当监听到评论事件时会创建一条记录可以在个人中心的通知列表看到一条通知消息。
```yaml ```yaml
apiVersion: notification.halo.run/v1alpha1 apiVersion: notification.halo.run/v1alpha1
@ -177,6 +191,7 @@ spec:
`GET /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications` `GET /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications`
2. 将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-as-read` 2. 将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-as-read`
3. 3.
批量将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-specified-as-read` 批量将通知标记为已读:`PUT /apis/api.notification.halo.run/v1alpha1/userspaces/{username}/notifications/mark-specified-as-read`
#### 通知模板 #### 通知模板
@ -185,14 +200,18 @@ spec:
它通过定义 `reasonSelector` 来引用事件类别,当事件触发时会根据用户的语言偏好和触发事件的类别来选择一个最佳的通知模板。 它通过定义 `reasonSelector` 来引用事件类别,当事件触发时会根据用户的语言偏好和触发事件的类别来选择一个最佳的通知模板。
选择通知模板的规则为: 选择通知模板的规则为:
1. 根据用户设置的语言,选择从通知模板中定义的 `spec.reasonSelector.language` 的值从更具体到不太具体的顺序例如gl_ES 的值将比 gl 的值具有更高的优先级)。 1. 根据用户设置的语言,选择从通知模板中定义的 `spec.reasonSelector.language` 的值从更具体到不太具体的顺序例如gl_ES 的值将比
2. 当通过语言成功匹配到模板时,匹配到的结果可能不止一个,如 `language``zh_CN` 的模板有三个那么会根据 `NotificationTemplate``metadata.creationTimestamp` 字段来选择一个最新的模板。 gl 的值具有更高的优先级)。
2. 当通过语言成功匹配到模板时,匹配到的结果可能不止一个,如 `language``zh_CN`
的模板有三个那么会根据 `NotificationTemplate``metadata.creationTimestamp` 字段来选择一个最新的模板。
这样的规则有助于用户可以个性化定制某些事件的模板内容。 这样的规则有助于用户可以个性化定制某些事件的模板内容。
模板语法使用 ThymeleafEngine 渲染,纯文本模板使用 `textual` 模板模式,语法参考: [usingthymeleaf.html#textual-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#textual-syntax) 模板语法使用 ThymeleafEngine 渲染,纯文本模板使用 `textual`
模板模式,语法参考: [usingthymeleaf.html#textual-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#textual-syntax)
`HTML` 则使用标准表达式语法在标签属性中取值,语法参考:[standard-expression-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#standard-expression-syntax) `HTML`
则使用标准表达式语法在标签属性中取值,语法参考:[standard-expression-syntax](https://www.thymeleaf.org/doc/tutorials/3.1/usingthymeleaf.html#standard-expression-syntax)
在通知中心渲染模板时会在 `ReasonAttributes` 中提供额外属性包括: 在通知中心渲染模板时会在 `ReasonAttributes` 中提供额外属性包括:
@ -224,7 +243,8 @@ spec:
#### 通知器声明及扩展 #### 通知器声明及扩展
`NotifierDescriptor` 自定义模型用于声明通知器,通过它来描述通知器的名称、描述和关联的 `ExtensionDefinition` 名称,让用户可以在用户界面知道通知器是什么以及它可以做什么, `NotifierDescriptor` 自定义模型用于声明通知器,通过它来描述通知器的名称、描述和关联的 `ExtensionDefinition`
名称,让用户可以在用户界面知道通知器是什么以及它可以做什么,
还让 NotificationCenter 知道如何加载通知器和准备通知器需要的设置以发送通知。 还让 NotificationCenter 知道如何加载通知器和准备通知器需要的设置以发送通知。
```yaml ```yaml
@ -261,52 +281,52 @@ spec:
```java ```java
public interface ReactiveNotifier extends ExtensionPoint { public interface ReactiveNotifier extends ExtensionPoint {
/** /**
* Notify user. * Notify user.
* *
* @param context notification context must not be null * @param context notification context must not be null
*/ */
Mono<Void> notify(NotificationContext context); Mono<Void> notify(NotificationContext context);
} }
@Data @Data
public class NotificationContext { public class NotificationContext {
private Message message; private Message message;
private ObjectNode receiverConfig; private ObjectNode receiverConfig;
private ObjectNode senderConfig; private ObjectNode senderConfig;
@Data @Data
static class Message { static class Message {
private MessagePayload payload; private MessagePayload payload;
private Subject subject; private Subject subject;
private String recipient; private String recipient;
private Instant timestamp; private Instant timestamp;
} }
@Data @Data
public static class Subject { public static class Subject {
private String apiVersion; private String apiVersion;
private String kind; private String kind;
private String name; private String name;
private String title; private String title;
private String url; private String url;
} }
@Data @Data
static class MessagePayload { static class MessagePayload {
private String title; private String title;
private String rawBody; private String rawBody;
private String htmlBody; private String htmlBody;
private ReasonAttributes attributes; private ReasonAttributes attributes;
} }
} }
``` ```

View File

@ -23,6 +23,12 @@ import { InterestReasonSubject } from './interest-reason-subject';
* @interface InterestReason * @interface InterestReason
*/ */
export interface InterestReason { export interface InterestReason {
/**
* The expression to be interested in
* @type {string}
* @memberof InterestReason
*/
'expression'?: string;
/** /**
* The name of the reason definition to be interested in * The name of the reason definition to be interested in
* @type {string} * @type {string}