From 34febb4d5d39d802719ef1fa1eeea7d0846a869f Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Thu, 11 Apr 2024 16:22:11 +0800 Subject: [PATCH] refactor: optimize the query for subscriptions (#5656) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind improvement /area core /milestone 2.15.x #### What this PR does / why we need it: 优化通知订阅数据查询 #### Does this PR introduce a user-facing change? ```release-note 优化通知订阅数据查询以解决由于数据过多导致查询慢进而阻塞调用方的问题 ``` --- .../DefaultNotificationCenter.java | 48 ++++++++++++++----- .../DefaultNotificationCenterTest.java | 23 +++++---- 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java index 3fc28526b..eb59f22d5 100644 --- a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java +++ b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java @@ -29,8 +29,12 @@ import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.Subscription; import run.halo.app.extension.GroupVersionKind; import run.halo.app.extension.ListOptions; +import run.halo.app.extension.ListResult; import run.halo.app.extension.Metadata; +import run.halo.app.extension.PageRequest; +import run.halo.app.extension.PageRequestImpl; import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.index.query.Query; import run.halo.app.extension.router.selector.FieldSelector; import run.halo.app.notification.endpoint.SubscriptionRouter; @@ -73,8 +77,7 @@ public class DefaultNotificationCenter implements NotificationCenter { @Override public Mono subscribe(Subscription.Subscriber subscriber, Subscription.InterestReason reason) { - return listSubscription(subscriber) - .filter(subscription -> subscription.getSpec().getReason().equals(reason)) + return listSubscription(subscriber, reason) .next() .switchIfEmpty(Mono.defer(() -> { var subscription = new Subscription(); @@ -90,7 +93,15 @@ public class DefaultNotificationCenter implements NotificationCenter { @Override public Mono unsubscribe(Subscription.Subscriber subscriber) { - return listSubscription(subscriber) + // pagination query all subscriptions of the subscriber to avoid large data + var pageRequest = PageRequestImpl.of(1, 200, + Sort.by("metadata.creationTimestamp", "metadata.name")); + return Flux.defer(() -> pageSubscriptionBy(subscriber, pageRequest)) + .expand(page -> page.hasNext() + ? pageSubscriptionBy(subscriber, pageRequest) + : Mono.empty() + ) + .flatMap(page -> Flux.fromIterable(page.getItems())) .flatMap(client::delete) .then(); } @@ -98,17 +109,27 @@ public class DefaultNotificationCenter implements NotificationCenter { @Override public Mono unsubscribe(Subscription.Subscriber subscriber, Subscription.InterestReason reason) { - return listSubscription(subscriber) - .filter(subscription -> subscription.getSpec().getReason().equals(reason)) + return listSubscription(subscriber, reason) .flatMap(client::delete) .then(); } - Flux listSubscription(Subscription.Subscriber subscriber) { + Mono> pageSubscriptionBy(Subscription.Subscriber subscriber, + PageRequest pageRequest) { var listOptions = new ListOptions(); - listOptions.setFieldSelector(FieldSelector.of( - equal("spec.subscriber", subscriber.toString())) + var fieldQuery = equal("spec.subscriber", subscriber.getName()); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + return client.listBy(Subscription.class, listOptions, pageRequest); + } + + Flux listSubscription(Subscription.Subscriber subscriber, + Subscription.InterestReason reason) { + var listOptions = new ListOptions(); + var fieldQuery = and( + getSubscriptionFieldQuery(reason.getReasonType(), reason.getSubject()), + equal("spec.subscriber", subscriber.getName()) ); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); return client.listAll(Subscription.class, listOptions, defaultSort()); } @@ -322,17 +343,22 @@ public class DefaultNotificationCenter implements NotificationCenter { Assert.notNull(reasonTypeName, "The reasonTypeName must not be null"); Assert.notNull(reasonSubject, "The reasonSubject must not be null"); final var listOptions = new ListOptions(); + var fieldQuery = getSubscriptionFieldQuery(reasonTypeName, reasonSubject); + listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); + return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort())); + } + + private static Query getSubscriptionFieldQuery(String reasonTypeName, + Subscription.ReasonSubject reasonSubject) { var matchAllSubject = new Subscription.ReasonSubject(); matchAllSubject.setKind(reasonSubject.getKind()); matchAllSubject.setApiVersion(reasonSubject.getApiVersion()); - var fieldQuery = and(equal("spec.reason.reasonType", reasonTypeName), + return and(equal("spec.reason.reasonType", reasonTypeName), or(equal("spec.reason.subject", reasonSubject.toString()), // source reason subject name is blank present match all equal("spec.reason.subject", matchAllSubject.toString()) ) ); - listOptions.setFieldSelector(FieldSelector.of(fieldQuery)); - return distinctByKey(client.listAll(Subscription.class, listOptions, defaultSort())); } static Flux distinctByKey(Flux source) { diff --git a/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java b/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java index eba34216f..5c97e1b5b 100644 --- a/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java +++ b/application/src/test/java/run/halo/app/notification/DefaultNotificationCenterTest.java @@ -30,7 +30,9 @@ import run.halo.app.core.extension.notification.Reason; import run.halo.app.core.extension.notification.ReasonType; import run.halo.app.core.extension.notification.Subscription; import run.halo.app.extension.GroupVersion; +import run.halo.app.extension.ListResult; import run.halo.app.extension.Metadata; +import run.halo.app.extension.PageRequest; import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.infra.utils.JsonUtils; @@ -125,11 +127,11 @@ class DefaultNotificationCenterTest { var subscriber = subscription.getSpec().getSubscriber(); - doReturn(Flux.just(subscription)) - .when(spyNotificationCenter).listSubscription(eq(subscriber)); - var reason = subscription.getSpec().getReason(); + doReturn(Flux.just(subscription)) + .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason)); + when(client.create(any(Subscription.class))).thenReturn(Mono.empty()); spyNotificationCenter.subscribe(subscriber, reason).block(); @@ -139,19 +141,21 @@ class DefaultNotificationCenterTest { // not exists subscription will create a new subscription var newReason = JsonUtils.deepCopy(reason); newReason.setReasonType("fake-reason-type"); + doReturn(Flux.empty()) + .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason)); spyNotificationCenter.subscribe(subscriber, newReason).block(); verify(client).create(any(Subscription.class)); } - @Test public void testUnsubscribe() { Subscription.Subscriber subscriber = new Subscription.Subscriber(); subscriber.setName("anonymousUser#A"); var spyNotificationCenter = spy(notificationCenter); var subscriptions = createSubscriptions(); - doReturn(Flux.fromIterable(subscriptions)) - .when(spyNotificationCenter).listSubscription(eq(subscriber)); + + doReturn(Mono.just(new ListResult<>(subscriptions))) + .when(spyNotificationCenter).pageSubscriptionBy(eq(subscriber), any(PageRequest.class)); when(client.delete(any(Subscription.class))).thenReturn(Mono.empty()); @@ -169,17 +173,20 @@ class DefaultNotificationCenterTest { var subscription = subscriptions.get(0); var subscriber = subscription.getSpec().getSubscriber(); - doReturn(Flux.just(subscription)) - .when(spyNotificationCenter).listSubscription(eq(subscriber)); var reason = subscription.getSpec().getReason(); var newReason = JsonUtils.deepCopy(reason); newReason.setReasonType("fake-reason-type"); + doReturn(Flux.empty()) + .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(newReason)); when(client.delete(any(Subscription.class))).thenReturn(Mono.empty()); spyNotificationCenter.unsubscribe(subscriber, newReason).block(); verify(client, times(0)).delete(any(Subscription.class)); + doReturn(Flux.just(subscription)) + .when(spyNotificationCenter).listSubscription(eq(subscriber), eq(reason)); + // exists subscription will be deleted spyNotificationCenter.unsubscribe(subscriber, reason).block(); verify(client).delete(any(Subscription.class));