From dcef5d4157ab6f8ac193f3e0a2b055017fa26312 Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Mon, 5 Feb 2024 12:09:07 +0800 Subject: [PATCH] fix: failure in data query due to reconciler triggered by uncommitted transaction (#5323) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind bugfix /milestone 2.12.x /area core #### What this PR does / why we need it: 修复事务未提交便触发控制器执行可能导致数据状态不正确的问题 **how to test it?** 1. 测试如 #5315 的问题是否还存在 2. 测试添加重名自定义模型对象会抛出异常且数据被回滚 #### Which issue(s) this PR fixes: Fixes #5315 #### Does this PR introduce a user-facing change? ```release-note 修复事务未提交便触发控制器执行可能导致数据状态不正确的问题 ``` --- .../ReactiveExtensionClientImpl.java | 35 +++++++++++++++---- .../ReactiveExtensionClientTest.java | 9 +++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 2bbe08ad0..c821e4491 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -26,7 +26,8 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.data.util.Predicates; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -40,7 +41,6 @@ import run.halo.app.extension.store.ReactiveExtensionStoreClient; @Slf4j @Component -@RequiredArgsConstructor public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ReactiveExtensionStoreClient client; @@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ConcurrentMap indexBuildingState = new ConcurrentHashMap<>(); + private TransactionalOperator transactionalOperator; + + public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client, + ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper, + IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine, + ReactiveTransactionManager reactiveTransactionManager) { + this.client = client; + this.converter = converter; + this.schemeManager = schemeManager; + this.objectMapper = objectMapper; + this.indexerFactory = indexerFactory; + this.indexedQueryEngine = indexedQueryEngine; + this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager); + } + + /** + * Only for test. + */ + void setTransactionalOperator(TransactionalOperator transactionalOperator) { + this.transactionalOperator = transactionalOperator; + } + @Override public Flux list(Class type, Predicate predicate, Comparator comparator) { @@ -151,7 +173,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono create(E extension) { checkClientWritable(extension); return Mono.just(extension) @@ -185,7 +206,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono update(E extension) { checkClientWritable(extension); // Refactor the atomic reference if we have a better solution. @@ -223,7 +243,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { } @Override - @Transactional public Mono delete(E extension) { checkClientWritable(extension); // set deletionTimestamp @@ -247,7 +266,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { var indexer = indexerFactory.getIndexer(gvk); return client.create(name, data) .map(created -> converter.convertFrom(type, created)) - .doOnNext(indexer::indexRecord); + .doOnNext(indexer::indexRecord) + .as(transactionalOperator::transactional); }); } @@ -258,7 +278,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); return client.update(name, version, data) .map(updated -> converter.convertFrom(type, updated)) - .doOnNext(indexer::updateRecord); + .doOnNext(indexer::updateRecord) + .as(transactionalOperator::transactional); }); } diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index dc7cd9528..c46e37700 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -36,6 +36,8 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,6 +65,9 @@ class ReactiveExtensionClientTest { @Mock IndexerFactory indexerFactory; + @Mock + ReactiveTransactionManager reactiveTransactionManager; + @Spy ObjectMapper objectMapper = JsonMapper.builder() .addModule(new JavaTimeModule()) @@ -76,6 +81,10 @@ class ReactiveExtensionClientTest { lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); + var transactionalOperator = mock(TransactionalOperator.class); + client.setTransactionalOperator(transactionalOperator); + lenient().when(transactionalOperator.transactional(any(Mono.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); } FakeExtension createFakeExtension(String name, Long version) {