fix: failure in data query due to reconciler triggered by uncommitted transaction (#5323)

#### What type of PR is this?
/kind bugfix
/milestone 2.12.x
/area core

#### What this PR does / why we need it:
修复事务未提交便触发控制器执行可能导致数据状态不正确的问题

**how to test it?**
1. 测试如 #5315 的问题是否还存在
2. 测试添加重名自定义模型对象会抛出异常且数据被回滚

#### Which issue(s) this PR fixes:
Fixes #5315

#### Does this PR introduce a user-facing change?
```release-note
修复事务未提交便触发控制器执行可能导致数据状态不正确的问题
```
pull/5332/head
guqing 2024-02-05 12:09:07 +08:00 committed by GitHub
parent b4e196372d
commit dcef5d4157
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 7 deletions

View File

@ -26,7 +26,8 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.util.Predicates;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
@ -40,7 +41,6 @@ import run.halo.app.extension.store.ReactiveExtensionStoreClient;
@Slf4j
@Component
@RequiredArgsConstructor
public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private final ReactiveExtensionStoreClient client;
@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private final ConcurrentMap<GroupKind, AtomicBoolean> indexBuildingState =
new ConcurrentHashMap<>();
private TransactionalOperator transactionalOperator;
public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client,
ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper,
IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine,
ReactiveTransactionManager reactiveTransactionManager) {
this.client = client;
this.converter = converter;
this.schemeManager = schemeManager;
this.objectMapper = objectMapper;
this.indexerFactory = indexerFactory;
this.indexedQueryEngine = indexedQueryEngine;
this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager);
}
/**
* Only for test.
*/
void setTransactionalOperator(TransactionalOperator transactionalOperator) {
this.transactionalOperator = transactionalOperator;
}
@Override
public <E extends Extension> Flux<E> list(Class<E> type, Predicate<E> predicate,
Comparator<E> comparator) {
@ -151,7 +173,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
}
@Override
@Transactional
public <E extends Extension> Mono<E> create(E extension) {
checkClientWritable(extension);
return Mono.just(extension)
@ -185,7 +206,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
}
@Override
@Transactional
public <E extends Extension> Mono<E> update(E extension) {
checkClientWritable(extension);
// Refactor the atomic reference if we have a better solution.
@ -223,7 +243,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
}
@Override
@Transactional
public <E extends Extension> Mono<E> delete(E extension) {
checkClientWritable(extension);
// set deletionTimestamp
@ -247,7 +266,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var indexer = indexerFactory.getIndexer(gvk);
return client.create(name, data)
.map(created -> converter.convertFrom(type, created))
.doOnNext(indexer::indexRecord);
.doOnNext(indexer::indexRecord)
.as(transactionalOperator::transactional);
});
}
@ -258,7 +278,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind());
return client.update(name, version, data)
.map(updated -> converter.convertFrom(type, updated))
.doOnNext(indexer::updateRecord);
.doOnNext(indexer::updateRecord)
.as(transactionalOperator::transactional);
});
}

View File

@ -36,6 +36,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -63,6 +65,9 @@ class ReactiveExtensionClientTest {
@Mock
IndexerFactory indexerFactory;
@Mock
ReactiveTransactionManager reactiveTransactionManager;
@Spy
ObjectMapper objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule())
@ -76,6 +81,10 @@ class ReactiveExtensionClientTest {
lenient().when(schemeManager.get(eq(FakeExtension.class)))
.thenReturn(fakeScheme);
lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme);
var transactionalOperator = mock(TransactionalOperator.class);
client.setTransactionalOperator(transactionalOperator);
lenient().when(transactionalOperator.transactional(any(Mono.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
}
FakeExtension createFakeExtension(String name, Long version) {