Fix the problem that notification might not work anymore (#7643)

#### What type of PR is this?

/kind improvement
/area core
/milestone 2.21.x

#### What this PR does / why we need it:

This PR sets timeout on notification trigger to make sure the procedure won't getting stuck forever.

#### Does this PR introduce a user-facing change?

```release-note
修复运行过程中通知器可能失效的问题
```
pull/7645/head
John Niang 2025-07-29 14:51:51 +08:00 committed by GitHub
parent ae9dd6f3d3
commit 6f6dbb8cc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 19 deletions

View File

@ -11,7 +11,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.notification.Notification;
import run.halo.app.core.extension.notification.NotifierDescriptor;
@ -52,7 +51,6 @@ public class DefaultNotificationCenter implements NotificationCenter {
log.debug("Dispatching notification to subscriber [{}] for reason [{}]",
subscriber, reason.getMetadata().getName());
})
.publishOn(Schedulers.boundedElastic())
.flatMap(subscriber -> dispatchNotification(reason, subscriber))
.then();
}

View File

@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.controller.Controller;
import run.halo.app.extension.controller.ControllerBuilder;
@ -32,6 +33,7 @@ import run.halo.app.plugin.extensionpoint.ExtensionGetter;
public class DefaultNotificationSender
implements NotificationSender, Reconciler<DefaultNotificationSender.QueueItem>,
SmartLifecycle {
private static final Duration TIMEOUT = Duration.ofSeconds(30);
private final ReactiveExtensionClient client;
private final ExtensionGetter extensionGetter;
@ -56,11 +58,14 @@ public class DefaultNotificationSender
@Override
public Mono<Void> sendNotification(String notifierExtensionName, NotificationContext context) {
return selectNotifier(notifierExtensionName)
.doOnNext(notifier -> {
var item = new QueueItem(UUID.randomUUID().toString(),
() -> notifier.notify(context).block(), 0);
requestQueue.addImmediately(item);
})
.flatMap(notifier -> Mono.fromRunnable(
() -> {
var item = new QueueItem(UUID.randomUUID().toString(),
() -> notifier.notify(context).block(TIMEOUT), 0);
requestQueue.addImmediately(item);
})
.subscribeOn(Schedulers.boundedElastic())
)
.then();
}
@ -84,7 +89,7 @@ public class DefaultNotificationSender
log.debug("Executing send notification task, [{}] remaining to-do tasks",
requestQueue.size());
request.setTimes(request.getTimes() + 1);
request.getTask().execute();
request.getTask().run();
return Result.doNotRetry();
}
@ -120,8 +125,8 @@ public class DefaultNotificationSender
/**
* <p>Queue item for {@link #requestQueue}.</p>
* <p>It holds a {@link SendNotificationTask} and a {@link #times} field.</p>
* <p>{@link SendNotificationTask} used to send email when consuming.</p>
* <p>It holds a {@link Runnable} and a {@link #times} field.</p>
* <p>{@link Runnable} used to send email when consuming.</p>
* <p>{@link #times} will be used to record the number of
* times the task has been executed, if retry three times on failure, it will be discarded.</p>
* <p>It also holds a {@link #id} field, which is used to identify the item. queue item with
@ -136,15 +141,11 @@ public class DefaultNotificationSender
@EqualsAndHashCode.Include
private final String id;
private final SendNotificationTask task;
private final Runnable task;
@Setter
private int times;
}
@FunctionalInterface
interface SendNotificationTask {
void execute();
}
}

View File

@ -1,7 +1,9 @@
package run.halo.app.notification;
import java.time.Duration;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import run.halo.app.core.extension.notification.Reason;
import run.halo.app.extension.ExtensionClient;
@ -20,11 +22,13 @@ import run.halo.app.extension.controller.Reconciler;
* @author guqing
* @since 2.10.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationTrigger implements Reconciler<Reconciler.Request> {
public static final String TRIGGERED_FINALIZER = "triggered";
private static final Duration TIMEOUT = Duration.ofMinutes(1);
private final ExtensionClient client;
private final NotificationCenter notificationCenter;
@ -44,14 +48,18 @@ public class NotificationTrigger implements Reconciler<Reconciler.Request> {
return Result.doNotRetry();
}
public void onNewReasonReceived(Reason reason) {
notificationCenter.notify(reason).block();
private void onNewReasonReceived(Reason reason) {
var name = reason.getMetadata().getName();
log.info("Sending notification for reason: {}", name);
notificationCenter.notify(reason).block(TIMEOUT);
log.info("Notification sent for reason: {}", name);
}
@Override
public Controller setupWith(ControllerBuilder builder) {
return builder
.extension(new Reason())
.workerCount(10)
.build();
}
}

View File

@ -21,10 +21,10 @@ class DefaultNotificationSenderTest {
void equalsTest() {
var item1 =
new DefaultNotificationSender.QueueItem("1",
mock(DefaultNotificationSender.SendNotificationTask.class), 0);
mock(Runnable.class), 0);
var item2 =
new DefaultNotificationSender.QueueItem("1",
mock(DefaultNotificationSender.SendNotificationTask.class), 1);
mock(Runnable.class), 1);
assertThat(item1).isEqualTo(item2);
}