Merge remote-tracking branch 'upstream/main' into refactor/console-as-subproject

pull/5314/head
John Niang 2024-02-05 23:40:07 +08:00
commit 29ec459eac
5 changed files with 49 additions and 34 deletions

View File

@ -26,7 +26,8 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import org.springframework.data.util.Predicates; import org.springframework.data.util.Predicates;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.retry.Retry; import reactor.util.retry.Retry;
@ -40,7 +41,6 @@ import run.halo.app.extension.store.ReactiveExtensionStoreClient;
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor
public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private final ReactiveExtensionStoreClient client; private final ReactiveExtensionStoreClient client;
@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private final ConcurrentMap<GroupKind, AtomicBoolean> indexBuildingState = private final ConcurrentMap<GroupKind, AtomicBoolean> indexBuildingState =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private TransactionalOperator transactionalOperator;
public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client,
ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper,
IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine,
ReactiveTransactionManager reactiveTransactionManager) {
this.client = client;
this.converter = converter;
this.schemeManager = schemeManager;
this.objectMapper = objectMapper;
this.indexerFactory = indexerFactory;
this.indexedQueryEngine = indexedQueryEngine;
this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager);
}
/**
* Only for test.
*/
void setTransactionalOperator(TransactionalOperator transactionalOperator) {
this.transactionalOperator = transactionalOperator;
}
@Override @Override
public <E extends Extension> Flux<E> list(Class<E> type, Predicate<E> predicate, public <E extends Extension> Flux<E> list(Class<E> type, Predicate<E> predicate,
Comparator<E> comparator) { Comparator<E> comparator) {
@ -151,7 +173,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
} }
@Override @Override
@Transactional
public <E extends Extension> Mono<E> create(E extension) { public <E extends Extension> Mono<E> create(E extension) {
checkClientWritable(extension); checkClientWritable(extension);
return Mono.just(extension) return Mono.just(extension)
@ -185,7 +206,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
} }
@Override @Override
@Transactional
public <E extends Extension> Mono<E> update(E extension) { public <E extends Extension> Mono<E> update(E extension) {
checkClientWritable(extension); checkClientWritable(extension);
// Refactor the atomic reference if we have a better solution. // Refactor the atomic reference if we have a better solution.
@ -223,7 +243,6 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
} }
@Override @Override
@Transactional
public <E extends Extension> Mono<E> delete(E extension) { public <E extends Extension> Mono<E> delete(E extension) {
checkClientWritable(extension); checkClientWritable(extension);
// set deletionTimestamp // set deletionTimestamp
@ -247,7 +266,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var indexer = indexerFactory.getIndexer(gvk); var indexer = indexerFactory.getIndexer(gvk);
return client.create(name, data) return client.create(name, data)
.map(created -> converter.convertFrom(type, created)) .map(created -> converter.convertFrom(type, created))
.doOnNext(indexer::indexRecord); .doOnNext(indexer::indexRecord)
.as(transactionalOperator::transactional);
}); });
} }
@ -258,7 +278,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind());
return client.update(name, version, data) return client.update(name, version, data)
.map(updated -> converter.convertFrom(type, updated)) .map(updated -> converter.convertFrom(type, updated))
.doOnNext(indexer::updateRecord); .doOnNext(indexer::updateRecord)
.as(transactionalOperator::transactional);
}); });
} }

View File

@ -1,9 +1,6 @@
package run.halo.app.extension.gc; package run.halo.app.extension.gc;
import static run.halo.app.extension.Comparators.compareCreationTimestamp;
import java.util.List; import java.util.List;
import java.util.function.Predicate;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import run.halo.app.extension.Extension; import run.halo.app.extension.Extension;
import run.halo.app.extension.ExtensionClient; import run.halo.app.extension.ExtensionClient;
@ -64,8 +61,6 @@ class GcSynchronizer implements Synchronizer<GcRequest> {
if (event instanceof SchemeRegistered registeredEvent) { if (event instanceof SchemeRegistered registeredEvent) {
var newScheme = registeredEvent.getNewScheme(); var newScheme = registeredEvent.getNewScheme();
listDeleted(newScheme.type()).forEach(watcher::onDelete); listDeleted(newScheme.type()).forEach(watcher::onDelete);
client.list(newScheme.type(), deleted(), compareCreationTimestamp(true))
.forEach(watcher::onDelete);
} }
}); });
client.watch(watcher); client.watch(watcher);
@ -77,16 +72,8 @@ class GcSynchronizer implements Synchronizer<GcRequest> {
<E extends Extension> List<E> listDeleted(Class<E> type) { <E extends Extension> List<E> listDeleted(Class<E> type) {
var options = new ListOptions() var options = new ListOptions()
.setFieldSelector( .setFieldSelector(
FieldSelector.of(QueryFactory.all("metadata.deletionTimestamp")) FieldSelector.of(QueryFactory.isNotNull("metadata.deletionTimestamp"))
); );
return client.listAll(type, options, Sort.by("metadata.creationTimestamp")) return client.listAll(type, options, Sort.by(Sort.Order.asc("metadata.creationTimestamp")));
.stream()
.sorted(compareCreationTimestamp(true))
.toList();
} }
private <E extends Extension> Predicate<E> deleted() {
return extension -> extension.getMetadata().getDeletionTimestamp() != null;
}
} }

View File

@ -20,7 +20,6 @@ import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.ListOptions; import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult; import run.halo.app.extension.ListResult;
import run.halo.app.extension.PageRequest; import run.halo.app.extension.PageRequest;
import run.halo.app.extension.index.query.All;
import run.halo.app.extension.index.query.QueryIndexViewImpl; import run.halo.app.extension.index.query.QueryIndexViewImpl;
import run.halo.app.extension.router.selector.FieldSelector; import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.extension.router.selector.LabelSelector; import run.halo.app.extension.router.selector.LabelSelector;
@ -155,12 +154,13 @@ public class IndexedQueryEngineImpl implements IndexedQueryEngine {
stopWatch.stop(); stopWatch.stop();
stopWatch.start("retrieve matched metadata names"); stopWatch.start("retrieve matched metadata names");
var hasLabelSelector = hasLabelSelector(options.getLabelSelector()); if (hasLabelSelector(options.getLabelSelector())) {
final List<String> matchedByLabels = hasLabelSelector var matchedByLabels = retrieveForLabelMatchers(options.getLabelSelector().getMatchers(),
? retrieveForLabelMatchers(options.getLabelSelector().getMatchers(), fieldPathEntryMap, fieldPathEntryMap, allMetadataNames);
allMetadataNames) if (allMetadataNames.size() != matchedByLabels.size()) {
: allMetadataNames;
indexView.removeByIdNotIn(new TreeSet<>(matchedByLabels)); indexView.removeByIdNotIn(new TreeSet<>(matchedByLabels));
}
}
stopWatch.stop(); stopWatch.stop();
stopWatch.start("retrieve matched metadata names by fields"); stopWatch.start("retrieve matched metadata names by fields");
@ -188,8 +188,6 @@ public class IndexedQueryEngineImpl implements IndexedQueryEngine {
} }
boolean hasFieldSelector(FieldSelector fieldSelector) { boolean hasFieldSelector(FieldSelector fieldSelector) {
return fieldSelector != null return fieldSelector != null;
&& fieldSelector.query() != null
&& !(fieldSelector.query() instanceof All);
} }
} }

View File

@ -5,11 +5,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
@Slf4j @Slf4j
@Component // @Component
// TODO Remove this class on next version
public class IndicesInitializer { public class IndicesInitializer {
private final IndicesService indicesService; private final IndicesService indicesService;

View File

@ -36,6 +36,8 @@ import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.DataIntegrityViolationException; import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.Exceptions; import reactor.core.Exceptions;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -63,6 +65,9 @@ class ReactiveExtensionClientTest {
@Mock @Mock
IndexerFactory indexerFactory; IndexerFactory indexerFactory;
@Mock
ReactiveTransactionManager reactiveTransactionManager;
@Spy @Spy
ObjectMapper objectMapper = JsonMapper.builder() ObjectMapper objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule()) .addModule(new JavaTimeModule())
@ -76,6 +81,10 @@ class ReactiveExtensionClientTest {
lenient().when(schemeManager.get(eq(FakeExtension.class))) lenient().when(schemeManager.get(eq(FakeExtension.class)))
.thenReturn(fakeScheme); .thenReturn(fakeScheme);
lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme);
var transactionalOperator = mock(TransactionalOperator.class);
client.setTransactionalOperator(transactionalOperator);
lenient().when(transactionalOperator.transactional(any(Mono.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
} }
FakeExtension createFakeExtension(String name, Long version) { FakeExtension createFakeExtension(String name, Long version) {