refactor: optimize the query for subscriptions (#5656)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.15.x

#### What this PR does / why we need it:
优化通知订阅数据查询

#### Does this PR introduce a user-facing change?
```release-note
优化通知订阅数据查询以解决由于数据过多导致查询慢进而阻塞调用方的问题
```
pull/5685/head^2
guqing 2024-04-11 16:22:11 +08:00 committed by GitHub
parent fc79de70fd
commit 34febb4d5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 19 deletions

View File

@ -29,8 +29,12 @@ import run.halo.app.core.extension.notification.ReasonType;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.index.query.Query;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.notification.endpoint.SubscriptionRouter;
@ -73,8 +77,7 @@ public class DefaultNotificationCenter implements NotificationCenter {
@Override
public Mono<Subscription> subscribe(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) {
return listSubscription(subscriber)
.filter(subscription -> subscription.getSpec().getReason().equals(reason))
return listSubscription(subscriber, reason)
.next()
.switchIfEmpty(Mono.defer(() -> {
var subscription = new Subscription();
@ -90,7 +93,15 @@ public class DefaultNotificationCenter implements NotificationCenter {
@Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber) {
return listSubscription(subscriber)
// pagination query all subscriptions of the subscriber to avoid large data
var pageRequest = PageRequestImpl.of(1, 200,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> pageSubscriptionBy(subscriber, pageRequest))
.expand(page -> page.hasNext()
? pageSubscriptionBy(subscriber, pageRequest)
: Mono.empty()
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
}
@ -98,17 +109,27 @@ public class DefaultNotificationCenter implements NotificationCenter {
@Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) {
return listSubscription(subscriber)
.filter(subscription -> subscription.getSpec().getReason().equals(reason))
return listSubscription(subscriber, reason)
.flatMap(client::delete)
.then();
}
Flux<Subscription> listSubscription(Subscription.Subscriber subscriber) {
Mono<ListResult<Subscription>> pageSubscriptionBy(Subscription.Subscriber subscriber,
PageRequest pageRequest) {
var listOptions = new ListOptions();
listOptions.setFieldSelector(FieldSelector.of(
equal("spec.subscriber", subscriber.toString()))
var fieldQuery = equal("spec.subscriber", subscriber.getName());
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return client.listBy(Subscription.class, listOptions, pageRequest);
}
Flux<Subscription> listSubscription(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) {
var listOptions = new ListOptions();
var fieldQuery = and(
getSubscriptionFieldQuery(reason.getReasonType(), reason.getSubject()),
equal("spec.subscriber", subscriber.getName())
);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return client.listAll(Subscription.class, listOptions, defaultSort());
}
@ -322,17 +343,22 @@ public class DefaultNotificationCenter implements NotificationCenter {
Assert.notNull(reasonTypeName, "The reasonTypeName must not be null");
Assert.notNull(reasonSubject, "The reasonSubject must not be null");
final var listOptions = new ListOptions();
var fieldQuery = getSubscriptionFieldQuery(reasonTypeName, reasonSubject);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort()));
}
private static Query getSubscriptionFieldQuery(String reasonTypeName,
Subscription.ReasonSubject reasonSubject) {
var matchAllSubject = new Subscription.ReasonSubject();
matchAllSubject.setKind(reasonSubject.getKind());
matchAllSubject.setApiVersion(reasonSubject.getApiVersion());
var fieldQuery = and(equal("spec.reason.reasonType", reasonTypeName),
return and(equal("spec.reason.reasonType", reasonTypeName),
or(equal("spec.reason.subject", reasonSubject.toString()),
// source reason subject name is blank present match all
equal("spec.reason.subject", matchAllSubject.toString())
)
);
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort()));
}
static Flux<Subscription> distinctByKey(Flux<Subscription> source) {

View File

@ -30,7 +30,9 @@ import run.halo.app.core.extension.notification.Reason;
import run.halo.app.core.extension.notification.ReasonType;
import run.halo.app.core.extension.notification.Subscription;
import run.halo.app.extension.GroupVersion;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.Metadata;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.infra.utils.JsonUtils;
@ -125,11 +127,11 @@ class DefaultNotificationCenterTest {
var subscriber = subscription.getSpec().getSubscriber();
doReturn(Flux.just(subscription))
.when(spyNotificationCenter).listSubscription(eq(subscriber));
var reason = subscription.getSpec().getReason();
doReturn(Flux.just(subscription))
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason));
when(client.create(any(Subscription.class))).thenReturn(Mono.empty());
spyNotificationCenter.subscribe(subscriber, reason).block();
@ -139,19 +141,21 @@ class DefaultNotificationCenterTest {
// not exists subscription will create a new subscription
var newReason = JsonUtils.deepCopy(reason);
newReason.setReasonType("fake-reason-type");
doReturn(Flux.empty())
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason));
spyNotificationCenter.subscribe(subscriber, newReason).block();
verify(client).create(any(Subscription.class));
}
@Test
public void testUnsubscribe() {
Subscription.Subscriber subscriber = new Subscription.Subscriber();
subscriber.setName("anonymousUser#A");
var spyNotificationCenter = spy(notificationCenter);
var subscriptions = createSubscriptions();
doReturn(Flux.fromIterable(subscriptions))
.when(spyNotificationCenter).listSubscription(eq(subscriber));
doReturn(Mono.just(new ListResult<>(subscriptions)))
.when(spyNotificationCenter).pageSubscriptionBy(eq(subscriber), any(PageRequest.class));
when(client.delete(any(Subscription.class))).thenReturn(Mono.empty());
@ -169,17 +173,20 @@ class DefaultNotificationCenterTest {
var subscription = subscriptions.get(0);
var subscriber = subscription.getSpec().getSubscriber();
doReturn(Flux.just(subscription))
.when(spyNotificationCenter).listSubscription(eq(subscriber));
var reason = subscription.getSpec().getReason();
var newReason = JsonUtils.deepCopy(reason);
newReason.setReasonType("fake-reason-type");
doReturn(Flux.empty())
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason));
when(client.delete(any(Subscription.class))).thenReturn(Mono.empty());
spyNotificationCenter.unsubscribe(subscriber, newReason).block();
verify(client, times(0)).delete(any(Subscription.class));
doReturn(Flux.just(subscription))
.when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason));
// exists subscription will be deleted
spyNotificationCenter.unsubscribe(subscriber, reason).block();
verify(client).delete(any(Subscription.class));