mirror of https://github.com/halo-dev/halo
Fix the problem that lucene crashes when searching for posts (#2791)
#### What type of PR is this? /kind bug /area core /milestone 2.0.0-rc.2 #### What this PR does / why we need it: This PR mainly refactor PostEventListener into PostEventReconciler to reconcile post events synchronously. After that, the concurrent issue will be gone. #### Which issue(s) this PR fixes: Fixes https://github.com/halo-dev/halo/issues/2756 Fixes https://github.com/halo-dev/halo/issues/2757 #### Does this PR introduce a user-facing change? ```release-note None ```pull/2800/head v2.0.0-rc.2
parent
569cea4ab8
commit
5be038834e
|
@ -1,17 +0,0 @@
|
||||||
package run.halo.app.event.post;
|
|
||||||
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
|
|
||||||
public class PostDeletedEvent extends ApplicationEvent {
|
|
||||||
|
|
||||||
private final String postName;
|
|
||||||
|
|
||||||
public PostDeletedEvent(Object source, String postName) {
|
|
||||||
super(source);
|
|
||||||
this.postName = postName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPostName() {
|
|
||||||
return postName;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
package run.halo.app.event.post;
|
||||||
|
|
||||||
|
public interface PostEvent {
|
||||||
|
|
||||||
|
String getName();
|
||||||
|
|
||||||
|
}
|
|
@ -2,7 +2,7 @@ package run.halo.app.event.post;
|
||||||
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
public class PostPublishedEvent extends ApplicationEvent {
|
public class PostPublishedEvent extends ApplicationEvent implements PostEvent {
|
||||||
|
|
||||||
private final String postName;
|
private final String postName;
|
||||||
|
|
||||||
|
@ -11,7 +11,8 @@ public class PostPublishedEvent extends ApplicationEvent {
|
||||||
this.postName = postName;
|
this.postName = postName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPostName() {
|
@Override
|
||||||
|
public String getName() {
|
||||||
return postName;
|
return postName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ package run.halo.app.event.post;
|
||||||
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
public class PostRecycledEvent extends ApplicationEvent {
|
public class PostRecycledEvent extends ApplicationEvent implements PostEvent {
|
||||||
|
|
||||||
private final String postName;
|
private final String postName;
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ public class PostRecycledEvent extends ApplicationEvent {
|
||||||
this.postName = postName;
|
this.postName = postName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPostName() {
|
public String getName() {
|
||||||
return postName;
|
return postName;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package run.halo.app.event.post;
|
||||||
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
public class PostUnpublishedEvent extends ApplicationEvent {
|
public class PostUnpublishedEvent extends ApplicationEvent implements PostEvent {
|
||||||
|
|
||||||
private final String postName;
|
private final String postName;
|
||||||
|
|
||||||
|
@ -11,7 +11,8 @@ public class PostUnpublishedEvent extends ApplicationEvent {
|
||||||
this.postName = postName;
|
this.postName = postName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPostName() {
|
@Override
|
||||||
|
public String getName() {
|
||||||
return postName;
|
return postName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import java.util.function.Supplier;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.StopWatch;
|
import org.springframework.util.StopWatch;
|
||||||
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
import run.halo.app.extension.controller.RequestQueue.DelayedEntry;
|
||||||
|
@ -32,6 +33,7 @@ public class DefaultController<R> implements Controller {
|
||||||
|
|
||||||
private final ExecutorService executor;
|
private final ExecutorService executor;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
private final Synchronizer<R> synchronizer;
|
private final Synchronizer<R> synchronizer;
|
||||||
|
|
||||||
private final Duration minDelay;
|
private final Duration minDelay;
|
||||||
|
@ -144,7 +146,9 @@ public class DefaultController<R> implements Controller {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
log.info("Controller worker {} started", this.name);
|
log.info("Controller worker {} started", this.name);
|
||||||
synchronizer.start();
|
if (synchronizer != null) {
|
||||||
|
synchronizer.start();
|
||||||
|
}
|
||||||
while (!isDisposed() && !Thread.currentThread().isInterrupted()) {
|
while (!isDisposed() && !Thread.currentThread().isInterrupted()) {
|
||||||
try {
|
try {
|
||||||
var entry = queue.take();
|
var entry = queue.take();
|
||||||
|
@ -213,7 +217,9 @@ public class DefaultController<R> implements Controller {
|
||||||
disposed = true;
|
disposed = true;
|
||||||
log.info("Disposing controller {}", name);
|
log.info("Disposing controller {}", name);
|
||||||
|
|
||||||
synchronizer.dispose();
|
if (synchronizer != null) {
|
||||||
|
synchronizer.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -100,10 +100,10 @@ public class LucenePostSearchService implements PostSearchService, DisposableBea
|
||||||
var doc = this.convert(post);
|
var doc = this.convert(post);
|
||||||
try {
|
try {
|
||||||
var seqNum =
|
var seqNum =
|
||||||
writer.updateDocument(new Term(PostDoc.ID_FIELD, post.getName()), doc);
|
writer.updateDocument(new Term(PostDoc.ID_FIELD, post.name()), doc);
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Updated document({}) with sequence number {} returned",
|
log.debug("Updated document({}) with sequence number {} returned",
|
||||||
post.getName(), seqNum);
|
post.name(), seqNum);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw Exceptions.propagate(e);
|
throw Exceptions.propagate(e);
|
||||||
|
@ -142,19 +142,19 @@ public class LucenePostSearchService implements PostSearchService, DisposableBea
|
||||||
|
|
||||||
private Document convert(PostDoc post) {
|
private Document convert(PostDoc post) {
|
||||||
var doc = new Document();
|
var doc = new Document();
|
||||||
doc.add(new StringField("name", post.getName(), YES));
|
doc.add(new StringField("name", post.name(), YES));
|
||||||
doc.add(new StoredField("title", post.getTitle()));
|
doc.add(new StoredField("title", post.title()));
|
||||||
|
|
||||||
var content = Jsoup.clean(stripToEmpty(post.getExcerpt()) + stripToEmpty(post.getContent()),
|
var content = Jsoup.clean(stripToEmpty(post.excerpt()) + stripToEmpty(post.content()),
|
||||||
Safelist.none());
|
Safelist.none());
|
||||||
|
|
||||||
doc.add(new StoredField("content", content));
|
doc.add(new StoredField("content", content));
|
||||||
doc.add(new TextField("searchable", post.getTitle() + content, NO));
|
doc.add(new TextField("searchable", post.title() + content, NO));
|
||||||
|
|
||||||
long publishTimestamp = post.getPublishTimestamp().toEpochMilli();
|
long publishTimestamp = post.publishTimestamp().toEpochMilli();
|
||||||
doc.add(new LongPoint("publishTimestamp", publishTimestamp));
|
doc.add(new LongPoint("publishTimestamp", publishTimestamp));
|
||||||
doc.add(new StoredField("publishTimestamp", publishTimestamp));
|
doc.add(new StoredField("publishTimestamp", publishTimestamp));
|
||||||
doc.add(new StoredField("permalink", post.getPermalink()));
|
doc.add(new StoredField("permalink", post.permalink()));
|
||||||
return doc;
|
return doc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,36 +1,34 @@
|
||||||
package run.halo.app.search.post;
|
package run.halo.app.search.post;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import lombok.Data;
|
import org.springframework.util.Assert;
|
||||||
import run.halo.app.theme.finders.vo.PostVo;
|
import run.halo.app.theme.finders.vo.PostVo;
|
||||||
|
|
||||||
@Data
|
public record PostDoc(String name,
|
||||||
public class PostDoc {
|
String title,
|
||||||
|
String excerpt,
|
||||||
|
String content,
|
||||||
|
Instant publishTimestamp,
|
||||||
|
String permalink) {
|
||||||
|
|
||||||
public static final String ID_FIELD = "name";
|
public static final String ID_FIELD = "name";
|
||||||
|
|
||||||
private String name;
|
public PostDoc {
|
||||||
|
Assert.hasText(name, "Name must not be blank");
|
||||||
private String title;
|
Assert.hasText(title, "Title must not be blank");
|
||||||
|
Assert.hasText(permalink, "Permalink must not be blank");
|
||||||
private String excerpt;
|
Assert.notNull(publishTimestamp, "PublishTimestamp must not be null");
|
||||||
|
}
|
||||||
private String content;
|
|
||||||
|
|
||||||
private Instant publishTimestamp;
|
|
||||||
|
|
||||||
private String permalink;
|
|
||||||
|
|
||||||
// TODO Move this static method to other place.
|
// TODO Move this static method to other place.
|
||||||
public static PostDoc from(PostVo postVo) {
|
public static PostDoc from(PostVo postVo) {
|
||||||
var post = new PostDoc();
|
return new PostDoc(
|
||||||
post.setName(postVo.getMetadata().getName());
|
postVo.getMetadata().getName(),
|
||||||
post.setTitle(postVo.getSpec().getTitle());
|
postVo.getSpec().getTitle(),
|
||||||
post.setExcerpt(postVo.getStatus().getExcerpt());
|
postVo.getStatus().getExcerpt(),
|
||||||
post.setPublishTimestamp(postVo.getSpec().getPublishTime());
|
postVo.getContent().getContent(),
|
||||||
post.setContent(postVo.getContent().getContent());
|
postVo.getSpec().getPublishTime(),
|
||||||
post.setPermalink(postVo.getStatus().getPermalink());
|
postVo.getStatus().getPermalink()
|
||||||
return post;
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,76 +0,0 @@
|
||||||
package run.halo.app.search.post;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import org.springframework.context.event.EventListener;
|
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import reactor.core.Exceptions;
|
|
||||||
import run.halo.app.event.post.PostPublishedEvent;
|
|
||||||
import run.halo.app.event.post.PostRecycledEvent;
|
|
||||||
import run.halo.app.event.post.PostUnpublishedEvent;
|
|
||||||
import run.halo.app.plugin.extensionpoint.ExtensionGetter;
|
|
||||||
import run.halo.app.theme.finders.PostFinder;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class PostEventListener {
|
|
||||||
|
|
||||||
private final ExtensionGetter extensionGetter;
|
|
||||||
|
|
||||||
private final PostFinder postFinder;
|
|
||||||
|
|
||||||
public PostEventListener(ExtensionGetter extensionGetter,
|
|
||||||
PostFinder postFinder) {
|
|
||||||
this.extensionGetter = extensionGetter;
|
|
||||||
this.postFinder = postFinder;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Async
|
|
||||||
@EventListener(PostPublishedEvent.class)
|
|
||||||
public void handlePostPublished(PostPublishedEvent publishedEvent) throws InterruptedException {
|
|
||||||
var latch = new CountDownLatch(1);
|
|
||||||
postFinder.getByName(publishedEvent.getPostName())
|
|
||||||
.map(PostDoc::from)
|
|
||||||
.flatMap(postDoc -> extensionGetter.getEnabledExtension(PostSearchService.class)
|
|
||||||
.doOnNext(searchService -> {
|
|
||||||
try {
|
|
||||||
searchService.addDocuments(List.of(postDoc));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw Exceptions.propagate(e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
)
|
|
||||||
.doFinally(signalType -> latch.countDown())
|
|
||||||
.subscribe();
|
|
||||||
latch.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Async
|
|
||||||
@EventListener(PostUnpublishedEvent.class)
|
|
||||||
public void handlePostUnpublished(PostUnpublishedEvent unpublishedEvent)
|
|
||||||
throws InterruptedException {
|
|
||||||
deletePostDoc(unpublishedEvent.getPostName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Async
|
|
||||||
@EventListener(PostRecycledEvent.class)
|
|
||||||
public void handlePostRecycled(PostRecycledEvent recycledEvent) throws InterruptedException {
|
|
||||||
deletePostDoc(recycledEvent.getPostName());
|
|
||||||
}
|
|
||||||
|
|
||||||
void deletePostDoc(String postName) throws InterruptedException {
|
|
||||||
var latch = new CountDownLatch(1);
|
|
||||||
extensionGetter.getEnabledExtension(PostSearchService.class)
|
|
||||||
.doOnNext(searchService -> {
|
|
||||||
try {
|
|
||||||
searchService.removeDocuments(Set.of(postName));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw Exceptions.propagate(e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.doFinally(signalType -> latch.countDown())
|
|
||||||
.subscribe();
|
|
||||||
latch.await();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
package run.halo.app.search.post;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.SmartLifecycle;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import reactor.core.Exceptions;
|
||||||
|
import run.halo.app.event.post.PostEvent;
|
||||||
|
import run.halo.app.event.post.PostPublishedEvent;
|
||||||
|
import run.halo.app.event.post.PostRecycledEvent;
|
||||||
|
import run.halo.app.event.post.PostUnpublishedEvent;
|
||||||
|
import run.halo.app.extension.controller.Controller;
|
||||||
|
import run.halo.app.extension.controller.ControllerBuilder;
|
||||||
|
import run.halo.app.extension.controller.DefaultController;
|
||||||
|
import run.halo.app.extension.controller.DefaultDelayQueue;
|
||||||
|
import run.halo.app.extension.controller.Reconciler;
|
||||||
|
import run.halo.app.extension.controller.RequestQueue;
|
||||||
|
import run.halo.app.plugin.extensionpoint.ExtensionGetter;
|
||||||
|
import run.halo.app.theme.finders.PostFinder;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class PostEventReconciler implements Reconciler<PostEvent>, SmartLifecycle {
|
||||||
|
|
||||||
|
private final ExtensionGetter extensionGetter;
|
||||||
|
|
||||||
|
private final PostFinder postFinder;
|
||||||
|
|
||||||
|
private final RequestQueue<PostEvent> postEventQueue;
|
||||||
|
|
||||||
|
|
||||||
|
private final Controller postEventController;
|
||||||
|
|
||||||
|
private boolean running = false;
|
||||||
|
|
||||||
|
public PostEventReconciler(ExtensionGetter extensionGetter,
|
||||||
|
PostFinder postFinder) {
|
||||||
|
this.extensionGetter = extensionGetter;
|
||||||
|
this.postFinder = postFinder;
|
||||||
|
|
||||||
|
postEventQueue = new DefaultDelayQueue<>(Instant::now);
|
||||||
|
postEventController = this.setupWith(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result reconcile(PostEvent postEvent) {
|
||||||
|
if (postEvent instanceof PostPublishedEvent) {
|
||||||
|
try {
|
||||||
|
addPostDoc(postEvent.getName());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw Exceptions.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (postEvent instanceof PostUnpublishedEvent
|
||||||
|
|| postEvent instanceof PostRecycledEvent) {
|
||||||
|
try {
|
||||||
|
deletePostDoc(postEvent.getName());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw Exceptions.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Controller setupWith(ControllerBuilder builder) {
|
||||||
|
return new DefaultController<>(
|
||||||
|
this.getClass().getName(),
|
||||||
|
this,
|
||||||
|
postEventQueue,
|
||||||
|
null,
|
||||||
|
Duration.ofMillis(100),
|
||||||
|
Duration.ofSeconds(1000)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener(PostPublishedEvent.class)
|
||||||
|
public void handlePostPublished(PostPublishedEvent publishedEvent) {
|
||||||
|
postEventQueue.addImmediately(publishedEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener(PostUnpublishedEvent.class)
|
||||||
|
public void handlePostUnpublished(PostUnpublishedEvent unpublishedEvent) {
|
||||||
|
postEventQueue.addImmediately(unpublishedEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener(PostRecycledEvent.class)
|
||||||
|
public void handlePostRecycled(PostRecycledEvent recycledEvent) {
|
||||||
|
postEventQueue.addImmediately(recycledEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addPostDoc(String postName) throws InterruptedException {
|
||||||
|
var latch = new CountDownLatch(1);
|
||||||
|
var disposable = postFinder.getByName(postName)
|
||||||
|
.map(PostDoc::from)
|
||||||
|
.flatMap(postDoc -> extensionGetter.getEnabledExtension(PostSearchService.class)
|
||||||
|
.doOnNext(searchService -> {
|
||||||
|
try {
|
||||||
|
searchService.addDocuments(List.of(postDoc));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw Exceptions.propagate(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.doFinally(signalType -> latch.countDown())
|
||||||
|
.subscribe(service -> {
|
||||||
|
}, throwable -> {
|
||||||
|
throw Exceptions.propagate(throwable);
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} finally {
|
||||||
|
disposable.dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void deletePostDoc(String postName) throws InterruptedException {
|
||||||
|
var latch = new CountDownLatch(1);
|
||||||
|
var disposable = extensionGetter.getEnabledExtension(PostSearchService.class)
|
||||||
|
.doOnNext(searchService -> {
|
||||||
|
try {
|
||||||
|
searchService.removeDocuments(Set.of(postName));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw Exceptions.propagate(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.doFinally(signalType -> latch.countDown())
|
||||||
|
.subscribe(service -> {
|
||||||
|
}, throwable -> {
|
||||||
|
throw Exceptions.propagate(throwable);
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} finally {
|
||||||
|
disposable.dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
postEventController.start();
|
||||||
|
running = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
running = false;
|
||||||
|
postEventController.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return running;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue