From dcef5d4157ab6f8ac193f3e0a2b055017fa26312 Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Mon, 5 Feb 2024 12:09:07 +0800 Subject: [PATCH 1/3] fix: failure in data query due to reconciler triggered by uncommitted transaction (#5323) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind bugfix /milestone 2.12.x /area core #### What this PR does / why we need it: 修复事务未提交便触发控制器执行可能导致数据状态不正确的问题 **how to test it?** 1. 测试如 #5315 的问题是否还存在 2. 测试添加重名自定义模型对象会抛出异常且数据被回滚 #### Which issue(s) this PR fixes: Fixes #5315 #### Does this PR introduce a user-facing change? ```release-note 修复事务未提交便触发控制器执行可能导致数据状态不正确的问题 ``` --- .../ReactiveExtensionClientImpl.java | 35 +++++++++++++++---- .../ReactiveExtensionClientTest.java | 9 +++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 2bbe08ad0..c821e4491 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -26,7 +26,8 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.data.util.Predicates; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -40,7 +41,6 @@ import run.halo.app.extension.store.ReactiveExtensionStoreClient; @Slf4j @Component -@RequiredArgsConstructor public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ReactiveExtensionStoreClient client; @@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ConcurrentMap indexBuildingState = new ConcurrentHashMap<>(); + private TransactionalOperator transactionalOperator; + + public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client, + ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper, + IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine, + ReactiveTransactionManager reactiveTransactionManager) { + this.client = client; + this.converter = converter; + this.schemeManager = schemeManager; + this.objectMapper = objectMapper; + this.indexerFactory = indexerFactory; + this.indexedQueryEngine = indexedQueryEngine; + this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager); + } + + /** + * Only for test. + */ + void setTransactionalOperator(TransactionalOperator transactionalOperator) { + this.transactionalOperator = transactionalOperator; + } + @Override public Flux list(Class type, Predicate predicate, Comparator comparator) { @@ -151,7 +173,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono create(E extension) { checkClientWritable(extension); return Mono.just(extension) @@ -185,7 +206,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono update(E extension) { checkClientWritable(extension); // Refactor the atomic reference if we have a better solution. @@ -223,7 +243,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono delete(E extension) { checkClientWritable(extension); // set deletionTimestamp @@ -247,7 +266,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { var indexer = indexerFactory.getIndexer(gvk); return client.create(name, data) .map(created -> converter.convertFrom(type, created)) - .doOnNext(indexer::indexRecord); + .doOnNext(indexer::indexRecord) + .as(transactionalOperator::transactional); }); } @@ -258,7 +278,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); return client.update(name, version, data) .map(updated -> converter.convertFrom(type, updated)) - .doOnNext(indexer::updateRecord); + .doOnNext(indexer::updateRecord) + .as(transactionalOperator::transactional); }); } diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index dc7cd9528..c46e37700 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -36,6 +36,8 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,6 +65,9 @@ class ReactiveExtensionClientTest { @Mock IndexerFactory indexerFactory; + @Mock + ReactiveTransactionManager reactiveTransactionManager; + @Spy ObjectMapper objectMapper = JsonMapper.builder() .addModule(new JavaTimeModule()) @@ -76,6 +81,10 @@ class ReactiveExtensionClientTest { lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); + var transactionalOperator = mock(TransactionalOperator.class); + client.setTransactionalOperator(transactionalOperator); + lenient().when(transactionalOperator.transactional(any(Mono.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); } FakeExtension createFakeExtension(String name, Long version) { From 06a44d05dc019917cda2637a7a2f88d55e25374c Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Mon, 5 Feb 2024 14:31:13 +0800 Subject: [PATCH 2/3] fix: possible incorrect result set obtained by Gc synchronizer when sartup (#5325) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind bug /area core /milestone 2.13.x #### What this PR does / why we need it: 修复启动时 GcSynchronizer 没有精准过滤出所需数据导致内存占用会出现较高峰值的问题 #### Which issue(s) this PR fixes: Fixes #5324 #### Does this PR introduce a user-facing change? ```release-note 修复启动时 GcSynchronizer 没有精准过滤出所需数据导致内存占用会出现较高峰值的问题 ``` --- .../halo/app/extension/gc/GcSynchronizer.java | 17 ++--------------- .../index/IndexedQueryEngineImpl.java | 18 ++++++++---------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/run/halo/app/extension/gc/GcSynchronizer.java b/application/src/main/java/run/halo/app/extension/gc/GcSynchronizer.java index bea81ae0a..7fdeab68b 100644 --- a/application/src/main/java/run/halo/app/extension/gc/GcSynchronizer.java +++ b/application/src/main/java/run/halo/app/extension/gc/GcSynchronizer.java @@ -1,9 +1,6 @@ package run.halo.app.extension.gc; -import static run.halo.app.extension.Comparators.compareCreationTimestamp; - import java.util.List; -import java.util.function.Predicate; import org.springframework.data.domain.Sort; import run.halo.app.extension.Extension; import run.halo.app.extension.ExtensionClient; @@ -64,8 +61,6 @@ class GcSynchronizer implements Synchronizer { if (event instanceof SchemeRegistered registeredEvent) { var newScheme = registeredEvent.getNewScheme(); listDeleted(newScheme.type()).forEach(watcher::onDelete); - client.list(newScheme.type(), deleted(), compareCreationTimestamp(true)) - .forEach(watcher::onDelete); } }); client.watch(watcher); @@ -77,16 +72,8 @@ class GcSynchronizer implements Synchronizer { List listDeleted(Class type) { var options = new ListOptions() .setFieldSelector( - FieldSelector.of(QueryFactory.all("metadata.deletionTimestamp")) + FieldSelector.of(QueryFactory.isNotNull("metadata.deletionTimestamp")) ); - return client.listAll(type, options, Sort.by("metadata.creationTimestamp")) - .stream() - .sorted(compareCreationTimestamp(true)) - .toList(); + return client.listAll(type, options, Sort.by(Sort.Order.asc("metadata.creationTimestamp"))); } - - private Predicate deleted() { - return extension -> extension.getMetadata().getDeletionTimestamp() != null; - } - } diff --git a/application/src/main/java/run/halo/app/extension/index/IndexedQueryEngineImpl.java b/application/src/main/java/run/halo/app/extension/index/IndexedQueryEngineImpl.java index ff11f4ca6..b3723fcb8 100644 --- a/application/src/main/java/run/halo/app/extension/index/IndexedQueryEngineImpl.java +++ b/application/src/main/java/run/halo/app/extension/index/IndexedQueryEngineImpl.java @@ -20,7 +20,6 @@ import run.halo.app.extension.GroupVersionKind; import run.halo.app.extension.ListOptions; import run.halo.app.extension.ListResult; import run.halo.app.extension.PageRequest; -import run.halo.app.extension.index.query.All; import run.halo.app.extension.index.query.QueryIndexViewImpl; import run.halo.app.extension.router.selector.FieldSelector; import run.halo.app.extension.router.selector.LabelSelector; @@ -155,12 +154,13 @@ public class IndexedQueryEngineImpl implements IndexedQueryEngine { stopWatch.stop(); stopWatch.start("retrieve matched metadata names"); - var hasLabelSelector = hasLabelSelector(options.getLabelSelector()); - final List matchedByLabels = hasLabelSelector - ? retrieveForLabelMatchers(options.getLabelSelector().getMatchers(), fieldPathEntryMap, - allMetadataNames) - : allMetadataNames; - indexView.removeByIdNotIn(new TreeSet<>(matchedByLabels)); + if (hasLabelSelector(options.getLabelSelector())) { + var matchedByLabels = retrieveForLabelMatchers(options.getLabelSelector().getMatchers(), + fieldPathEntryMap, allMetadataNames); + if (allMetadataNames.size() != matchedByLabels.size()) { + indexView.removeByIdNotIn(new TreeSet<>(matchedByLabels)); + } + } stopWatch.stop(); stopWatch.start("retrieve matched metadata names by fields"); @@ -188,8 +188,6 @@ public class IndexedQueryEngineImpl implements IndexedQueryEngine { } boolean hasFieldSelector(FieldSelector fieldSelector) { - return fieldSelector != null - && fieldSelector.query() != null - && !(fieldSelector.query() instanceof All); + return fieldSelector != null; } } From 7341f9de6c707920ce3ab9152f7d37d96c8115de Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Mon, 5 Feb 2024 14:54:13 +0800 Subject: [PATCH 3/3] chore: remove unnecessary indices build process for lucene on startup (#5332) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind cleanup /area core /milestone 2.13.x #### What this PR does / why we need it: 移除启动时不必要的搜索引擎索引构建步骤 #### Does this PR introduce a user-facing change? ```release-note 移除启动时不必要的搜索引擎索引构建步骤 ``` --- .../src/main/java/run/halo/app/search/IndicesInitializer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/run/halo/app/search/IndicesInitializer.java b/application/src/main/java/run/halo/app/search/IndicesInitializer.java index cbc54a498..b90dcffa1 100644 --- a/application/src/main/java/run/halo/app/search/IndicesInitializer.java +++ b/application/src/main/java/run/halo/app/search/IndicesInitializer.java @@ -5,11 +5,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; @Slf4j -@Component +// @Component +// TODO Remove this class on next version public class IndicesInitializer { private final IndicesService indicesService;