From 6f6dbb8cc1aac695f24744383375b1504e153728 Mon Sep 17 00:00:00 2001 From: John Niang Date: Tue, 29 Jul 2025 14:51:51 +0800 Subject: [PATCH] Fix the problem that notification might not work anymore (#7643) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind improvement /area core /milestone 2.21.x #### What this PR does / why we need it: This PR sets timeout on notification trigger to make sure the procedure won't getting stuck forever. #### Does this PR introduce a user-facing change? ```release-note 修复运行过程中通知器可能失效的问题 ``` --- .../DefaultNotificationCenter.java | 2 -- .../DefaultNotificationSender.java | 27 ++++++++++--------- .../app/notification/NotificationTrigger.java | 12 +++++++-- .../DefaultNotificationSenderTest.java | 4 +-- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java index 7eb502281..8d40fe761 100644 --- a/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java +++ b/application/src/main/java/run/halo/app/notification/DefaultNotificationCenter.java @@ -11,7 +11,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import run.halo.app.core.extension.User; import run.halo.app.core.extension.notification.Notification; import run.halo.app.core.extension.notification.NotifierDescriptor; @@ -52,7 +51,6 @@ public class DefaultNotificationCenter implements NotificationCenter { log.debug("Dispatching notification to subscriber [{}] for reason [{}]", subscriber, reason.getMetadata().getName()); }) - .publishOn(Schedulers.boundedElastic()) .flatMap(subscriber -> dispatchNotification(reason, subscriber)) .then(); } diff --git a/application/src/main/java/run/halo/app/notification/DefaultNotificationSender.java b/application/src/main/java/run/halo/app/notification/DefaultNotificationSender.java index b00f032ab..147932efb 100644 --- a/application/src/main/java/run/halo/app/notification/DefaultNotificationSender.java +++ b/application/src/main/java/run/halo/app/notification/DefaultNotificationSender.java @@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import run.halo.app.extension.ReactiveExtensionClient; import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; @@ -32,6 +33,7 @@ import run.halo.app.plugin.extensionpoint.ExtensionGetter; public class DefaultNotificationSender implements NotificationSender, Reconciler, SmartLifecycle { + private static final Duration TIMEOUT = Duration.ofSeconds(30); private final ReactiveExtensionClient client; private final ExtensionGetter extensionGetter; @@ -56,11 +58,14 @@ public class DefaultNotificationSender @Override public Mono sendNotification(String notifierExtensionName, NotificationContext context) { return selectNotifier(notifierExtensionName) - .doOnNext(notifier -> { - var item = new QueueItem(UUID.randomUUID().toString(), - () -> notifier.notify(context).block(), 0); - requestQueue.addImmediately(item); - }) + .flatMap(notifier -> Mono.fromRunnable( + () -> { + var item = new QueueItem(UUID.randomUUID().toString(), + () -> notifier.notify(context).block(TIMEOUT), 0); + requestQueue.addImmediately(item); + }) + .subscribeOn(Schedulers.boundedElastic()) + ) .then(); } @@ -84,7 +89,7 @@ public class DefaultNotificationSender log.debug("Executing send notification task, [{}] remaining to-do tasks", requestQueue.size()); request.setTimes(request.getTimes() + 1); - request.getTask().execute(); + request.getTask().run(); return Result.doNotRetry(); } @@ -120,8 +125,8 @@ public class DefaultNotificationSender /** *

Queue item for {@link #requestQueue}.

- *

It holds a {@link SendNotificationTask} and a {@link #times} field.

- *

{@link SendNotificationTask} used to send email when consuming.

+ *

It holds a {@link Runnable} and a {@link #times} field.

+ *

{@link Runnable} used to send email when consuming.

*

{@link #times} will be used to record the number of * times the task has been executed, if retry three times on failure, it will be discarded.

*

It also holds a {@link #id} field, which is used to identify the item. queue item with @@ -136,15 +141,11 @@ public class DefaultNotificationSender @EqualsAndHashCode.Include private final String id; - private final SendNotificationTask task; + private final Runnable task; @Setter private int times; } - @FunctionalInterface - interface SendNotificationTask { - void execute(); - } } diff --git a/application/src/main/java/run/halo/app/notification/NotificationTrigger.java b/application/src/main/java/run/halo/app/notification/NotificationTrigger.java index 34f2de062..31c2147ff 100644 --- a/application/src/main/java/run/halo/app/notification/NotificationTrigger.java +++ b/application/src/main/java/run/halo/app/notification/NotificationTrigger.java @@ -1,7 +1,9 @@ package run.halo.app.notification; +import java.time.Duration; import java.util.Set; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import run.halo.app.core.extension.notification.Reason; import run.halo.app.extension.ExtensionClient; @@ -20,11 +22,13 @@ import run.halo.app.extension.controller.Reconciler; * @author guqing * @since 2.10.0 */ +@Slf4j @Component @RequiredArgsConstructor public class NotificationTrigger implements Reconciler { public static final String TRIGGERED_FINALIZER = "triggered"; + private static final Duration TIMEOUT = Duration.ofMinutes(1); private final ExtensionClient client; private final NotificationCenter notificationCenter; @@ -44,14 +48,18 @@ public class NotificationTrigger implements Reconciler { return Result.doNotRetry(); } - public void onNewReasonReceived(Reason reason) { - notificationCenter.notify(reason).block(); + private void onNewReasonReceived(Reason reason) { + var name = reason.getMetadata().getName(); + log.info("Sending notification for reason: {}", name); + notificationCenter.notify(reason).block(TIMEOUT); + log.info("Notification sent for reason: {}", name); } @Override public Controller setupWith(ControllerBuilder builder) { return builder .extension(new Reason()) + .workerCount(10) .build(); } } diff --git a/application/src/test/java/run/halo/app/notification/DefaultNotificationSenderTest.java b/application/src/test/java/run/halo/app/notification/DefaultNotificationSenderTest.java index 3a26b833e..6d5d34cfa 100644 --- a/application/src/test/java/run/halo/app/notification/DefaultNotificationSenderTest.java +++ b/application/src/test/java/run/halo/app/notification/DefaultNotificationSenderTest.java @@ -21,10 +21,10 @@ class DefaultNotificationSenderTest { void equalsTest() { var item1 = new DefaultNotificationSender.QueueItem("1", - mock(DefaultNotificationSender.SendNotificationTask.class), 0); + mock(Runnable.class), 0); var item2 = new DefaultNotificationSender.QueueItem("1", - mock(DefaultNotificationSender.SendNotificationTask.class), 1); + mock(Runnable.class), 1); assertThat(item1).isEqualTo(item2); }